You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/09/01 22:16:59 UTC

[1/3] hadoop git commit: Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem.

Repository: hadoop
Updated Branches:
  refs/heads/trunk c5281a85e -> 7996eca7d


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index bc98e2f..03569d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planne
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.After;
@@ -47,6 +48,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+/**
+ * Testing the class {@link InMemoryPlan}.
+ */
+@SuppressWarnings("checkstyle:nowhitespaceafter")
 public class TestInMemoryPlan {
 
   private String user = "yarn";
@@ -62,6 +67,7 @@ public class TestInMemoryPlan {
   private ReservationAgent agent;
   private Planner replanner;
   private RMContext context;
+  private long maxPeriodicity;
 
   @Before
   public void setUp() throws PlanningException {
@@ -72,7 +78,7 @@ public class TestInMemoryPlan {
 
     clock = mock(Clock.class);
     queueMetrics = mock(QueueMetrics.class);
-    policy = mock(SharingPolicy.class);
+    policy = new NoOverCommitPolicy();
     replanner = mock(Planner.class);
 
     when(clock.getTime()).thenReturn(1L);
@@ -95,15 +101,41 @@ public class TestInMemoryPlan {
 
   @Test
   public void testAddReservation() {
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
+        ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation, false);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    checkAllocation(plan, alloc, start, 0);
+  }
+
+  @Test
+  public void testAddPeriodicReservation() throws PlanningException {
+
+    maxPeriodicity = 100;
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, maxPeriodicity,
+        context, new UTCClock());
+
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 50 };
+    int start = 10;
+    long period = 20;
+    ReservationAllocation rAllocation = createReservationAllocation(
+        reservationID, start, alloc, String.valueOf(period));
+    // use periodicity of 1hr
+    rAllocation.setPeriodicity(period);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -111,32 +143,54 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
-    checkAllocation(plan, alloc, start);
+    checkAllocation(plan, alloc, start, period);
+
+    RLESparseResourceAllocation available =
+        plan.getAvailableResourceOverTime(user, reservationID, 150, 330, 50);
+    System.out.println(available);
   }
 
-  private void checkAllocation(Plan plan, int[] alloc, int start) {
+  private void checkAllocation(Plan plan, int[] alloc, int start,
+      long periodicity) {
+    long end = start + alloc.length;
+    if (periodicity > 0) {
+      end = end + maxPeriodicity;
+    }
     RLESparseResourceAllocation userCons =
-        plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
+        plan.getConsumptionForUserOverTime(user, start, end * 3);
 
     for (int i = 0; i < alloc.length; i++) {
-      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
-          plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
-          userCons.getCapacityAtTime(start + i));
+      // only one instance for non-periodic reservation
+      if (periodicity <= 0) {
+        Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+            plan.getTotalCommittedResources(start + i));
+        Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+            userCons.getCapacityAtTime(start + i));
+      } else {
+        // periodic reservations should repeat
+        long y = 0;
+        Resource res = Resource.newInstance(1024 * (alloc[i]), (alloc[i]));
+        while (y <= end * 2) {
+          Assert.assertEquals("At time: " + start + i + y, res,
+              plan.getTotalCommittedResources(start + i + y));
+          Assert.assertEquals(" At time: " + (start + i + y), res,
+              userCons.getCapacityAtTime(start + i + y));
+          y = y + periodicity;
+        }
+      }
     }
   }
 
   @Test
   public void testAddEmptyReservation() {
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
+        ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = {};
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -148,15 +202,14 @@ public class TestInMemoryPlan {
   @Test
   public void testAddReservationAlreadyExists() {
     // First add a reservation
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
+        ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -164,7 +217,7 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
-    checkAllocation(plan, alloc, start);
+    checkAllocation(plan, alloc, start, 0);
 
     // Try to add it again
     try {
@@ -180,16 +233,15 @@ public class TestInMemoryPlan {
 
   @Test
   public void testUpdateReservation() {
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     // First add a reservation
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -210,8 +262,8 @@ public class TestInMemoryPlan {
     // Now update it
     start = 110;
     int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
-    rAllocation = createReservationAllocation(reservationID, start,
-            updatedAlloc, true);
+    rAllocation =
+        createReservationAllocation(reservationID, start, updatedAlloc, true);
     try {
       plan.updateReservation(rAllocation);
     } catch (PlanningException e) {
@@ -219,32 +271,71 @@ public class TestInMemoryPlan {
     }
     doAssertions(plan, rAllocation);
 
-    userCons =
-        plan.getConsumptionForUserOverTime(user, start, start
-            + updatedAlloc.length);
+    userCons = plan.getConsumptionForUserOverTime(user, start,
+        start + updatedAlloc.length);
 
     for (int i = 0; i < updatedAlloc.length; i++) {
-      Assert.assertEquals(
-     Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
-              + i), plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(
-          Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
-              + i), userCons.getCapacityAtTime(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
+          updatedAlloc[i] + i), plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
+          updatedAlloc[i] + i), userCons.getCapacityAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testUpdatePeriodicReservation() {
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    // First add a reservation
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 20 };
+    int start = 100;
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
+    // use periodicity of 1hr
+    long period = 3600000;
+    rAllocation.getReservationDefinition()
+        .setRecurrenceExpression(String.valueOf(period));
+    rAllocation.setPeriodicity(period);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation, false);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    System.out.println(plan.toString());
+    doAssertions(plan, rAllocation);
+    checkAllocation(plan, alloc, start, period);
+
+    // Now update it
+    start = 110;
+    int[] updatedAlloc = { 30, 40, 50 };
+    rAllocation =
+        createReservationAllocation(reservationID, start, updatedAlloc);
+    rAllocation.getReservationDefinition()
+        .setRecurrenceExpression(String.valueOf(period));
+    rAllocation.setPeriodicity(period);
+    try {
+      plan.updateReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
     }
+    doAssertions(plan, rAllocation);
+    checkAllocation(plan, updatedAlloc, start, period);
   }
 
   @Test
   public void testUpdateNonExistingReservation() {
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     // Try to update a reservation without adding
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationAllocation rAllocation =
-            createReservationAllocation(reservationID, start, alloc);
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.updateReservation(rAllocation);
@@ -260,15 +351,14 @@ public class TestInMemoryPlan {
   @Test
   public void testDeleteReservation() {
     // First add a reservation
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationAllocation rAllocation =
-            createReservationAllocation(reservationID, start, alloc, true);
+        createReservationAllocation(reservationID, start, alloc, true);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -307,10 +397,46 @@ public class TestInMemoryPlan {
   }
 
   @Test
+  public void testDeletePeriodicReservation() {
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    // First add a reservation
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 20 };
+    int start = 100;
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
+    // use periodicity of 1hr
+    long period = 3600000;
+    rAllocation.getReservationDefinition()
+        .setRecurrenceExpression(String.valueOf(period));
+    rAllocation.setPeriodicity(period);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation, false);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    System.out.println(plan.toString());
+    doAssertions(plan, rAllocation);
+    checkAllocation(plan, alloc, start, period);
+
+    // Now delete it
+    try {
+      plan.deleteReservation(reservationID);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+    System.out.print(plan);
+    checkAllocation(plan, new int[] { 0, 0 }, start, period);
+  }
+
+  @Test
   public void testDeleteNonExistingReservation() {
-    Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     // Try to delete a reservation without adding
@@ -328,8 +454,9 @@ public class TestInMemoryPlan {
 
   @Test
   public void testArchiveCompletedReservations() {
+    SharingPolicy sharingPolicy = mock(SharingPolicy.class);
     Plan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        new InMemoryPlan(queueMetrics, sharingPolicy, agent, totalCapacity, 1L,
             resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID1 =
         ReservationSystemTestUtil.getNewReservationId();
@@ -337,7 +464,7 @@ public class TestInMemoryPlan {
     int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationAllocation rAllocation =
-            createReservationAllocation(reservationID1, start, alloc1);
+        createReservationAllocation(reservationID1, start, alloc1);
     Assert.assertNull(plan.getReservationById(reservationID1));
     try {
       plan.addReservation(rAllocation, false);
@@ -345,15 +472,14 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
-    checkAllocation(plan, alloc1, start);
-
+    checkAllocation(plan, alloc1, start, 0);
 
     // Now add another one
     ReservationId reservationID2 =
         ReservationSystemTestUtil.getNewReservationId();
     int[] alloc2 = { 0, 5, 10, 5, 0 };
     rAllocation =
-            createReservationAllocation(reservationID2, start, alloc2, true);
+        createReservationAllocation(reservationID2, start, alloc2, true);
     Assert.assertNull(plan.getReservationById(reservationID2));
     try {
       plan.addReservation(rAllocation, false);
@@ -367,16 +493,18 @@ public class TestInMemoryPlan {
 
     for (int i = 0; i < alloc2.length; i++) {
       Assert.assertEquals(
-          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
-              + alloc2[i] + i), plan.getTotalCommittedResources(start + i));
+          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
+              alloc1[i] + alloc2[i] + i),
+          plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(
-          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
-              + alloc2[i] + i), userCons.getCapacityAtTime(start + i));
+          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
+              alloc1[i] + alloc2[i] + i),
+          userCons.getCapacityAtTime(start + i));
     }
 
     // Now archive completed reservations
     when(clock.getTime()).thenReturn(106L);
-    when(policy.getValidWindow()).thenReturn(1L);
+    when(sharingPolicy.getValidWindow()).thenReturn(1L);
     try {
       // will only remove 2nd reservation as only that has fallen out of the
       // archival window
@@ -386,7 +514,7 @@ public class TestInMemoryPlan {
     }
     Assert.assertNotNull(plan.getReservationById(reservationID1));
     Assert.assertNull(plan.getReservationById(reservationID2));
-    checkAllocation(plan, alloc1, start);
+    checkAllocation(plan, alloc1, start, 0);
 
     when(clock.getTime()).thenReturn(107L);
     try {
@@ -411,15 +539,14 @@ public class TestInMemoryPlan {
 
   @Test
   public void testGetReservationsById() {
-    Plan plan =
-            new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-                    resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
-    int[] alloc = {10, 10, 10, 10, 10, 10};
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -429,31 +556,30 @@ public class TestInMemoryPlan {
 
     // Verify that get by reservation id works.
     Set<ReservationAllocation> rAllocations =
-            plan.getReservations(reservationID, null, "");
+        plan.getReservations(reservationID, null, "");
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
 
     // Verify that get by reservation id works even when time range
     // and user is invalid.
     ReservationInterval interval = new ReservationInterval(0, 0);
     rAllocations = plan.getReservations(reservationID, interval, "invalid");
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
   }
 
   @Test
   public void testGetReservationsByInvalidId() {
-    Plan plan =
-            new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-                    resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
-    int[] alloc = {10, 10, 10, 10, 10, 10};
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -463,23 +589,22 @@ public class TestInMemoryPlan {
 
     // If reservationId is null, then nothing is returned.
     ReservationId invalidReservationID =
-            ReservationSystemTestUtil.getNewReservationId();
+        ReservationSystemTestUtil.getNewReservationId();
     Set<ReservationAllocation> rAllocations =
-            plan.getReservations(invalidReservationID, null, "");
+        plan.getReservations(invalidReservationID, null, "");
     Assert.assertTrue(rAllocations.size() == 0);
   }
 
   @Test
   public void testGetReservationsByTimeInterval() {
-    Plan plan =
-            new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-                    resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
-    int[] alloc = {10, 10, 10, 10, 10, 10};
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -489,23 +614,24 @@ public class TestInMemoryPlan {
 
     // Verify that get by time interval works if the selection interval
     // completely overlaps with the allocation.
-    ReservationInterval interval = new ReservationInterval(rAllocation
-            .getStartTime(), rAllocation.getEndTime());
+    ReservationInterval interval = new ReservationInterval(
+        rAllocation.getStartTime(), rAllocation.getEndTime());
     Set<ReservationAllocation> rAllocations =
-            plan.getReservations(null, interval, "");
+        plan.getReservations(null, interval, "");
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
 
     // Verify that get by time interval works if the selection interval
     // falls within the allocation
     long duration = rAllocation.getEndTime() - rAllocation.getStartTime();
-    interval = new ReservationInterval(rAllocation.getStartTime() + duration
-            * (long)0.3, rAllocation.getEndTime() - duration * (long)0.3);
+    interval = new ReservationInterval(
+        rAllocation.getStartTime() + duration * (long) 0.3,
+        rAllocation.getEndTime() - duration * (long) 0.3);
     rAllocations = plan.getReservations(null, interval, "");
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
 
     // Verify that get by time interval selects 1 allocation if the end
     // time of the selection interval falls right at the start of the
@@ -513,13 +639,13 @@ public class TestInMemoryPlan {
     interval = new ReservationInterval(0, rAllocation.getStartTime());
     rAllocations = plan.getReservations(null, interval, "");
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
 
     // Verify that get by time interval selects no reservations if the start
     // time of the selection interval falls right at the end of the allocation.
-    interval = new ReservationInterval(rAllocation
-            .getEndTime(), Long.MAX_VALUE);
+    interval =
+        new ReservationInterval(rAllocation.getEndTime(), Long.MAX_VALUE);
     rAllocations = plan.getReservations(null, interval, "");
     Assert.assertTrue(rAllocations.size() == 0);
 
@@ -532,15 +658,14 @@ public class TestInMemoryPlan {
 
   @Test
   public void testGetReservationsAtTime() {
-    Plan plan =
-            new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-                    resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
-    int[] alloc = {10, 10, 10, 10, 10, 10};
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -549,23 +674,22 @@ public class TestInMemoryPlan {
     }
 
     Set<ReservationAllocation> rAllocations =
-            plan.getReservationsAtTime(rAllocation.getStartTime());
+        plan.getReservationsAtTime(rAllocation.getStartTime());
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
   }
 
   @Test
   public void testGetReservationsWithNoInput() {
-    Plan plan =
-            new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-                    resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
-            ReservationSystemTestUtil.getNewReservationId();
-    int[] alloc = {10, 10, 10, 10, 10, 10};
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    ReservationAllocation rAllocation = createReservationAllocation
-            (reservationID, start, alloc);
+    ReservationAllocation rAllocation =
+        createReservationAllocation(reservationID, start, alloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation, false);
@@ -576,22 +700,21 @@ public class TestInMemoryPlan {
     // Verify that getReservations defaults to getting all reservations if no
     // reservationID, time interval, and user is provided,
     Set<ReservationAllocation> rAllocations =
-            plan.getReservations(null, null, "");
+        plan.getReservations(null, null, "");
     Assert.assertTrue(rAllocations.size() == 1);
-    Assert.assertTrue(rAllocation.compareTo(
-            (ReservationAllocation) rAllocations.toArray()[0]) == 0);
+    Assert.assertTrue(rAllocation
+        .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
   }
 
   @Test
   public void testGetReservationsWithNoReservation() {
-    Plan plan =
-            new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-                    resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     // Verify that get reservation returns no entries if no queries are made.
 
     ReservationInterval interval = new ReservationInterval(0, Long.MAX_VALUE);
     Set<ReservationAllocation> rAllocations =
-            plan.getReservations(null, interval, "");
+        plan.getReservations(null, interval, "");
     Assert.assertTrue(rAllocations.size() == 0);
   }
 
@@ -600,7 +723,9 @@ public class TestInMemoryPlan {
     Assert.assertNotNull(plan.getReservationById(reservationID));
     Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
     Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
-    Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
+    if (rAllocation.getPeriodicity() <= 0) {
+      Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
+    }
     Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
     Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
     Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
@@ -610,7 +735,8 @@ public class TestInMemoryPlan {
   }
 
   private ReservationDefinition createSimpleReservationDefinition(long arrival,
-      long deadline, long duration, Collection<ReservationRequest> resources) {
+      long deadline, long duration, Collection<ReservationRequest> resources,
+      String recurrenceExpression) {
     // create a request with a single atomic ask
     ReservationDefinition rDef = new ReservationDefinitionPBImpl();
     ReservationRequests reqs = new ReservationRequestsPBImpl();
@@ -619,6 +745,7 @@ public class TestInMemoryPlan {
     rDef.setReservationRequests(reqs);
     rDef.setArrival(arrival);
     rDef.setDeadline(deadline);
+    rDef.setRecurrenceExpression(recurrenceExpression);
     return rDef;
   }
 
@@ -633,31 +760,43 @@ public class TestInMemoryPlan {
       } else {
         numContainers = alloc[i];
       }
-      ReservationRequest rr =
-          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-              (numContainers));
+      ReservationRequest rr = ReservationRequest
+          .newInstance(Resource.newInstance(1024, 1), (numContainers));
       req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
     }
     return req;
   }
 
-  private ReservationAllocation createReservationAllocation(ReservationId
-            reservationID, int start, int[] alloc) {
-    return createReservationAllocation(reservationID, start, alloc, false);
+  private ReservationAllocation createReservationAllocation(
+      ReservationId reservationID, int start, int[] alloc) {
+    return createReservationAllocation(reservationID, start, alloc, false, "0");
+  }
+
+  private ReservationAllocation createReservationAllocation(
+      ReservationId reservationID, int start, int[] alloc, boolean isStep) {
+    return createReservationAllocation(reservationID, start, alloc, isStep,
+        "0");
+  }
+
+  private ReservationAllocation createReservationAllocation(
+      ReservationId reservationID, int start, int[] alloc,
+      String recurrenceExp) {
+    return createReservationAllocation(reservationID, start, alloc, false,
+        recurrenceExp);
   }
 
-  private ReservationAllocation createReservationAllocation(ReservationId
-            reservationID, int start, int[] alloc, boolean isStep) {
+  private ReservationAllocation createReservationAllocation(
+      ReservationId reservationID, int start, int[] alloc, boolean isStep,
+      String recurrenceExp) {
     Map<ReservationInterval, ReservationRequest> allocations =
-            generateAllocation(start, alloc, isStep);
+        generateAllocation(start, alloc, isStep);
     ReservationDefinition rDef =
-            createSimpleReservationDefinition(start, start + alloc.length,
-                    alloc.length, allocations.values());
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values(), recurrenceExp);
     Map<ReservationInterval, Resource> allocs =
-            ReservationSystemUtil.toResources(allocations);
+        ReservationSystemUtil.toResources(allocations);
     return new InMemoryReservationAllocation(reservationID, rDef, user,
-            planName,
-                    start, start + alloc.length, allocs, resCalc, minAlloc);
+        planName, start, start + alloc.length, allocs, resCalc, minAlloc);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
index 554eb58..457e2ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
@@ -19,26 +19,27 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Testing the class PeriodicRLESparseResourceAllocation.
+ * Testing the class {@link PeriodicRLESparseResourceAllocation}.
  */
+@SuppressWarnings("checkstyle:nowhitespaceafter")
 public class TestPeriodicRLESparseResourceAllocation {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestPeriodicRLESparseResourceAllocation.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestPeriodicRLESparseResourceAllocation.class);
 
   @Test
   public void testPeriodicCapacity() {
-    int[] alloc = {10, 7, 5, 2, 0};
-    long[] timeSteps = {0L, 5L, 10L, 15L, 19L};
-    RLESparseResourceAllocation rleSparseVector =
-        ReservationSystemTestUtil.generateRLESparseResourceAllocation(
-            alloc, timeSteps);
+    int[] alloc = { 10, 7, 5, 2, 0 };
+    long[] timeSteps = { 0L, 5L, 10L, 15L, 19L };
+    RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
     PeriodicRLESparseResourceAllocation periodicVector =
         new PeriodicRLESparseResourceAllocation(rleSparseVector, 20L);
     LOG.info(periodicVector.toString());
@@ -54,43 +55,74 @@ public class TestPeriodicRLESparseResourceAllocation {
 
   @Test
   public void testMaxPeriodicCapacity() {
-    int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
-    long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
-    RLESparseResourceAllocation rleSparseVector =
-        ReservationSystemTestUtil.generateRLESparseResourceAllocation(
-            alloc, timeSteps);
+    int[] alloc = { 2, 5, 7, 10, 3, 4, 6, 8 };
+    long[] timeSteps = { 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L };
+    RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
     PeriodicRLESparseResourceAllocation periodicVector =
         new PeriodicRLESparseResourceAllocation(rleSparseVector, 8L);
     LOG.info(periodicVector.toString());
-    Assert.assertEquals(
-        periodicVector.getMaximumPeriodicCapacity(0, 1),
+    Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(0, 1),
         Resource.newInstance(10, 10));
-    Assert.assertEquals(
-        periodicVector.getMaximumPeriodicCapacity(8, 2),
+    Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(8, 2),
         Resource.newInstance(7, 7));
-    Assert.assertEquals(
-        periodicVector.getMaximumPeriodicCapacity(16, 3),
+    Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(16, 3),
         Resource.newInstance(10, 10));
-    Assert.assertEquals(
-        periodicVector.getMaximumPeriodicCapacity(17, 4),
+    Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(17, 4),
         Resource.newInstance(5, 5));
-    Assert.assertEquals(
-        periodicVector.getMaximumPeriodicCapacity(32, 5),
+    Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(32, 5),
         Resource.newInstance(4, 4));
   }
 
   @Test
+  public void testMixPeriodicAndNonPeriodic() throws PlanningException {
+    int[] alloc = { 2, 5, 0 };
+    long[] timeSteps = { 1L, 2L, 3L };
+    RLESparseResourceAllocation tempPeriodic = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
+    PeriodicRLESparseResourceAllocation periodic =
+        new PeriodicRLESparseResourceAllocation(tempPeriodic, 10L);
+
+    int[] alloc2 = { 10, 10, 0 };
+    long[] timeSteps2 = { 12L, 13L, 14L };
+    RLESparseResourceAllocation nonPeriodic = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc2, timeSteps2);
+
+    RLESparseResourceAllocation merged =
+        RLESparseResourceAllocation.merge(nonPeriodic.getResourceCalculator(),
+            Resource.newInstance(100 * 1024, 100), periodic, nonPeriodic,
+            RLESparseResourceAllocation.RLEOperator.add, 2, 25);
+
+    Assert.assertEquals(Resource.newInstance(5, 5),
+        merged.getCapacityAtTime(2L));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        merged.getCapacityAtTime(3L));
+    Assert.assertEquals(Resource.newInstance(2, 2),
+        merged.getCapacityAtTime(11L));
+    Assert.assertEquals(Resource.newInstance(15, 15),
+        merged.getCapacityAtTime(12L));
+    Assert.assertEquals(Resource.newInstance(10, 10),
+        merged.getCapacityAtTime(13L));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        merged.getCapacityAtTime(14L));
+    Assert.assertEquals(Resource.newInstance(2, 2),
+        merged.getCapacityAtTime(21L));
+    Assert.assertEquals(Resource.newInstance(5, 5),
+        merged.getCapacityAtTime(22L));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        merged.getCapacityAtTime(23L));
+  }
+
+  @Test
   public void testSetCapacityInInterval() {
-    int[] alloc = {2, 5, 0};
-    long[] timeSteps = {1L, 2L, 3L};
-    RLESparseResourceAllocation rleSparseVector =
-        ReservationSystemTestUtil.generateRLESparseResourceAllocation(
-            alloc, timeSteps);
+    int[] alloc = { 2, 5, 0 };
+    long[] timeSteps = { 1L, 2L, 3L };
+    RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
     PeriodicRLESparseResourceAllocation periodicVector =
         new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
     ReservationInterval interval = new ReservationInterval(5L, 10L);
-    periodicVector.addInterval(
-        interval, Resource.newInstance(8, 8));
+    periodicVector.addInterval(interval, Resource.newInstance(8, 8));
     Assert.assertEquals(Resource.newInstance(8, 8),
         periodicVector.getCapacityAtTime(5L));
     Assert.assertEquals(Resource.newInstance(8, 8),
@@ -99,21 +131,20 @@ public class TestPeriodicRLESparseResourceAllocation {
         periodicVector.getCapacityAtTime(10L));
     Assert.assertEquals(Resource.newInstance(0, 0),
         periodicVector.getCapacityAtTime(0L));
-    Assert.assertFalse(periodicVector.addInterval(
-        new ReservationInterval(7L, 12L), Resource.newInstance(8, 8)));
+    // Assert.assertFalse(periodicVector.addInterval(
+    // new ReservationInterval(7L, 12L), Resource.newInstance(8, 8)));
   }
 
   public void testRemoveInterval() {
-    int[] alloc = {2, 5, 3, 4, 0};
-    long[] timeSteps = {1L, 3L, 5L, 7L, 9L};
-    RLESparseResourceAllocation rleSparseVector =
-        ReservationSystemTestUtil.generateRLESparseResourceAllocation(
-            alloc, timeSteps);
+    int[] alloc = { 2, 5, 3, 4, 0 };
+    long[] timeSteps = { 1L, 3L, 5L, 7L, 9L };
+    RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
     PeriodicRLESparseResourceAllocation periodicVector =
         new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
     ReservationInterval interval = new ReservationInterval(3L, 7L);
-    Assert.assertTrue(periodicVector.removeInterval(
-        interval, Resource.newInstance(3, 3)));
+    Assert.assertTrue(
+        periodicVector.removeInterval(interval, Resource.newInstance(3, 3)));
     Assert.assertEquals(Resource.newInstance(2, 2),
         periodicVector.getCapacityAtTime(1L));
     Assert.assertEquals(Resource.newInstance(2, 2),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index bfe46e1..0027ceb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -40,10 +40,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Testing the class {@link RLESparseResourceAllocation}.
+ */
+@SuppressWarnings("checkstyle:nowhitespaceafter")
 public class TestRLESparseResourceAllocation {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestRLESparseResourceAllocation.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRLESparseResourceAllocation.class);
 
   @Test
   public void testMergeAdd() throws PlanningException {
@@ -196,7 +200,8 @@ public class TestRLESparseResourceAllocation {
       // Expected!
     }
 
-    // Testing that the subtractTestNonNegative detects problems even if only one
+    // Testing that the subtractTestNonNegative detects problems even if only
+    // one
     // of the resource dimensions is "<0"
     a.put(10L, Resource.newInstance(10, 5));
     b.put(11L, Resource.newInstance(5, 6));
@@ -286,9 +291,8 @@ public class TestRLESparseResourceAllocation {
   public void testRangeOverlapping() {
     ResourceCalculator resCalc = new DefaultResourceCalculator();
 
-    RLESparseResourceAllocation r =
-        new RLESparseResourceAllocation(resCalc);
-    int[] alloc = {10, 10, 10, 10, 10, 10};
+    RLESparseResourceAllocation r = new RLESparseResourceAllocation(resCalc);
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     Set<Entry<ReservationInterval, Resource>> inputs =
         generateAllocation(start, alloc, false).entrySet();
@@ -299,9 +303,9 @@ public class TestRLESparseResourceAllocation {
     long d = r.getLatestNonNullTime();
 
     // tries to trigger "out-of-range" bug
-    r =  r.getRangeOverlapping(s, d);
-    r = r.getRangeOverlapping(s-1, d-1);
-    r = r.getRangeOverlapping(s+1, d+1);
+    r = r.getRangeOverlapping(s, d);
+    r = r.getRangeOverlapping(s - 1, d - 1);
+    r = r.getRangeOverlapping(s + 1, d + 1);
   }
 
   @Test
@@ -370,25 +374,29 @@ public class TestRLESparseResourceAllocation {
     // Current bug prevents this to pass. The RLESparseResourceAllocation
     // does not handle removal of "partial"
     // allocations correctly.
-    Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(10)
-        .getMemorySize());
-    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemorySize());
-    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(19).getMemorySize());
-    Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(21)
-        .getMemorySize());
-    Assert.assertEquals(2 * 102400, rleSparseVector.getCapacityAtTime(26)
-        .getMemorySize());
+    Assert.assertEquals(102400,
+        rleSparseVector.getCapacityAtTime(10).getMemorySize());
+    Assert.assertEquals(0,
+        rleSparseVector.getCapacityAtTime(13).getMemorySize());
+    Assert.assertEquals(0,
+        rleSparseVector.getCapacityAtTime(19).getMemorySize());
+    Assert.assertEquals(102400,
+        rleSparseVector.getCapacityAtTime(21).getMemorySize());
+    Assert.assertEquals(2 * 102400,
+        rleSparseVector.getCapacityAtTime(26).getMemorySize());
 
     ReservationInterval riRemove2 = new ReservationInterval(9, 13);
     rleSparseVector.removeInterval(riRemove2, rr);
     LOG.info(rleSparseVector.toString());
 
-    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(11).getMemorySize());
-    Assert.assertEquals(-102400, rleSparseVector.getCapacityAtTime(9)
-        .getMemorySize());
-    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemorySize());
-    Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(20)
-        .getMemorySize());
+    Assert.assertEquals(0,
+        rleSparseVector.getCapacityAtTime(11).getMemorySize());
+    Assert.assertEquals(-102400,
+        rleSparseVector.getCapacityAtTime(9).getMemorySize());
+    Assert.assertEquals(0,
+        rleSparseVector.getCapacityAtTime(13).getMemorySize());
+    Assert.assertEquals(102400,
+        rleSparseVector.getCapacityAtTime(20).getMemorySize());
 
   }
 
@@ -500,7 +508,8 @@ public class TestRLESparseResourceAllocation {
     }
     mapAllocations = rleSparseVector.toIntervalMap();
     Assert.assertTrue(mapAllocations.size() == 5);
-    for (Entry<ReservationInterval, Resource> entry : mapAllocations.entrySet()) {
+    for (Entry<ReservationInterval, Resource> entry : mapAllocations
+        .entrySet()) {
       ReservationInterval interval = entry.getKey();
       Resource resource = entry.getValue();
       if (interval.getStartTime() == 101L) {
@@ -526,59 +535,46 @@ public class TestRLESparseResourceAllocation {
 
   @Test
   public void testMaxPeriodicCapacity() {
-    long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
-    int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
-    RLESparseResourceAllocation rleSparseVector =
-        ReservationSystemTestUtil.generateRLESparseResourceAllocation(
-            alloc, timeSteps);
+    long[] timeSteps = { 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L };
+    int[] alloc = { 2, 5, 7, 10, 3, 4, 6, 8 };
+    RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
     LOG.info(rleSparseVector.toString());
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(0, 1),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 1),
         Resource.newInstance(10, 10));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(0, 2),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 2),
         Resource.newInstance(7, 7));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(0, 3),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 3),
         Resource.newInstance(10, 10));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(0, 4),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 4),
         Resource.newInstance(3, 3));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(0, 5),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 5),
         Resource.newInstance(4, 4));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(0, 5),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 5),
         Resource.newInstance(4, 4));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(7, 5),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(7, 5),
         Resource.newInstance(8, 8));
-    Assert.assertEquals(
-        rleSparseVector.getMaximumPeriodicCapacity(10, 3),
+    Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(10, 3),
         Resource.newInstance(0, 0));
   }
 
   @Test
   public void testGetMinimumCapacityInInterval() {
-    long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
-    int[] alloc = {2, 5, 7, 10, 3, 4, 0, 8};
-    RLESparseResourceAllocation rleSparseVector =
-        ReservationSystemTestUtil.generateRLESparseResourceAllocation(
-            alloc, timeSteps);
+    long[] timeSteps = { 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L };
+    int[] alloc = { 2, 5, 7, 10, 3, 4, 0, 8 };
+    RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
+        .generateRLESparseResourceAllocation(alloc, timeSteps);
     LOG.info(rleSparseVector.toString());
-    Assert.assertEquals(
-        rleSparseVector.getMinimumCapacityInInterval(
-            new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5));
-    Assert.assertEquals(
-        rleSparseVector.getMinimumCapacityInInterval(
-            new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3));
-    Assert.assertEquals(
-        rleSparseVector.getMinimumCapacityInInterval(
-            new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0));
+    Assert.assertEquals(rleSparseVector.getMinimumCapacityInInterval(
+        new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5));
+    Assert.assertEquals(rleSparseVector.getMinimumCapacityInInterval(
+        new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3));
+    Assert.assertEquals(rleSparseVector.getMinimumCapacityInInterval(
+        new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0));
   }
 
-  private void setupArrays(
-      TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
+  private void setupArrays(TreeMap<Long, Resource> a,
+      TreeMap<Long, Resource> b) {
     a.put(10L, Resource.newInstance(5, 5));
     a.put(20L, Resource.newInstance(10, 10));
     a.put(30L, Resource.newInstance(15, 15));
@@ -620,8 +616,8 @@ public class TestRLESparseResourceAllocation {
         numContainers = alloc[i];
       }
       req.put(new ReservationInterval(startTime + i, startTime + i + 1),
-          ReservationSystemUtil.toResource(ReservationRequest.newInstance(
-              Resource.newInstance(1024, 1), (numContainers))));
+          ReservationSystemUtil.toResource(ReservationRequest
+              .newInstance(Resource.newInstance(1024, 1), (numContainers))));
     }
     return req;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
index c4f94c2..ddd290d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
@@ -78,9 +79,10 @@ public class TestSimpleCapacityReplanner {
     enf.init("blah", conf);
 
     // Initialize the plan with more resources
-    InMemoryPlan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
-            res, minAlloc, maxAlloc, "dedicated", enf, true, context, clock);
+    InMemoryPlan plan = new InMemoryPlan(queueMetrics, policy, agent,
+        clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", enf, true,
+        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
+        context, clock);
 
     // add reservation filling the plan (separating them 1ms, so we are sure
     // s2 follows s1 on acceptance


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem.

Posted by su...@apache.org.
Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7996eca7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7996eca7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7996eca7

Branch: refs/heads/trunk
Commit: 7996eca7dcfaa1bdf970e32022274f2699bef8a1
Parents: c5281a8
Author: Subru Krishnan <su...@apache.org>
Authored: Fri Sep 1 15:16:40 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Sep 1 15:16:40 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   6 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   4 +-
 .../reservation/AbstractReservationSystem.java  |  90 ++--
 .../AbstractSchedulerPlanFollower.java          | 183 ++++----
 .../reservation/InMemoryPlan.java               | 400 ++++++++++++-----
 .../InMemoryReservationAllocation.java          |  36 +-
 .../reservation/NoOverCommitPolicy.java         |   2 +-
 .../PeriodicRLESparseResourceAllocation.java    | 130 ++++--
 .../resourcemanager/reservation/PlanEdit.java   |  24 +-
 .../resourcemanager/reservation/PlanView.java   |  94 ++--
 .../RLESparseResourceAllocation.java            | 115 ++---
 .../reservation/ReservationAllocation.java      |  60 ++-
 .../reservation/ReservationInputValidator.java  | 134 +++---
 .../reservation/ReservationSystem.java          |   6 +-
 .../reservation/SharingPolicy.java              |  13 +-
 .../reservation/planning/Planner.java           |   2 +-
 .../reservation/planning/PlanningAlgorithm.java |   2 +-
 .../reservation/planning/StageAllocator.java    |   2 +-
 .../planning/StageAllocatorGreedy.java          |   2 +-
 .../planning/StageAllocatorGreedyRLE.java       |   5 +-
 .../planning/StageAllocatorLowCostAligned.java  |   4 +-
 .../reservation/ReservationSystemTestUtil.java  | 135 +++---
 .../reservation/TestInMemoryPlan.java           | 431 ++++++++++++-------
 ...TestPeriodicRLESparseResourceAllocation.java | 109 +++--
 .../TestRLESparseResourceAllocation.java        | 122 +++---
 .../planning/TestSimpleCapacityReplanner.java   |   8 +-
 26 files changed, 1342 insertions(+), 777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4944821..27ca957 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -262,6 +262,12 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
       1000L;
 
+  /** The maximum periodicity for the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_MAX_PERIODICITY =
+      RM_PREFIX + "reservation-system.max-periodicity";
+  public static final long DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY =
+      86400000L;
+
   /**
    * Enable periodic monitor threads.
    * @see #RM_SCHEDULER_MONITOR_POLICIES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index bd7bf93..1d3111c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.TestConfigurationFieldsBase;
  */
 public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
 
-  @SuppressWarnings("deprecation")
+  @SuppressWarnings({"deprecation", "methodlength"})
   @Override
   public void initializeMemberVariables() {
     xmlFilename = new String("yarn-default.xml");
@@ -69,6 +69,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare.add(YarnConfiguration
         .YARN_SECURITY_SERVICE_AUTHORIZATION_COLLECTOR_NODEMANAGER_PROTOCOL);
     configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
 
     // Federation default configs to be ignored
     configurationPropsToSkipCompare

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 5ef4912..5b8772c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -18,6 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -46,17 +57,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * This is the implementation of {@link ReservationSystem} based on the
  * {@link ResourceScheduler}
@@ -66,8 +66,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public abstract class AbstractReservationSystem extends AbstractService
     implements ReservationSystem {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(AbstractReservationSystem.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractReservationSystem.class);
 
   // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
 
@@ -103,6 +103,8 @@ public abstract class AbstractReservationSystem extends AbstractService
 
   private boolean isRecoveryEnabled = false;
 
+  private long maxPeriodicity;
+
   /**
    * Construct the service.
    * 
@@ -143,36 +145,41 @@ public abstract class AbstractReservationSystem extends AbstractService
     this.conf = conf;
     scheduler = rmContext.getScheduler();
     // Get the plan step size
-    planStepSize =
-        conf.getTimeDuration(
-            YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
-            YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
-            TimeUnit.MILLISECONDS);
+    planStepSize = conf.getTimeDuration(
+        YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+        TimeUnit.MILLISECONDS);
     if (planStepSize < 0) {
       planStepSize =
           YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
     }
+    maxPeriodicity =
+        conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
+            YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
+    if (maxPeriodicity <= 0) {
+      maxPeriodicity =
+          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY;
+    }
     // Create a plan corresponding to every reservable queue
     Set<String> planQueueNames = scheduler.getPlanQueues();
     for (String planQueueName : planQueueNames) {
       Plan plan = initializePlan(planQueueName);
       plans.put(planQueueName, plan);
     }
-    isRecoveryEnabled = conf.getBoolean(
-        YarnConfiguration.RECOVERY_ENABLED,
+    isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
         YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
 
     if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
-            YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
-                    conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
-                            YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
+        YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE)
+        && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
+            YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
       reservationsACLsManager = new ReservationsACLsManager(scheduler, conf);
     }
   }
 
   private void loadPlan(String planName,
       Map<ReservationId, ReservationAllocationStateProto> reservations)
-          throws PlanningException {
+      throws PlanningException {
     Plan plan = plans.get(planName);
     Resource minAllocation = getMinAllocation();
     ResourceCalculator rescCalculator = getResourceCalculator();
@@ -248,8 +255,8 @@ public abstract class AbstractReservationSystem extends AbstractService
       Class<?> planFollowerPolicyClazz =
           conf.getClassByName(planFollowerPolicyClassName);
       if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
-        return (PlanFollower) ReflectionUtils.newInstance(
-            planFollowerPolicyClazz, conf);
+        return (PlanFollower) ReflectionUtils
+            .newInstance(planFollowerPolicyClazz, conf);
       } else {
         throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
             + " not instance of " + PlanFollower.class.getCanonicalName());
@@ -257,7 +264,8 @@ public abstract class AbstractReservationSystem extends AbstractService
     } catch (ClassNotFoundException e) {
       throw new YarnRuntimeException(
           "Could not instantiate PlanFollowerPolicy: "
-              + planFollowerPolicyClassName, e);
+              + planFollowerPolicyClassName,
+          e);
     }
   }
 
@@ -371,9 +379,8 @@ public abstract class AbstractReservationSystem extends AbstractService
   public ReservationId getNewReservationId() {
     writeLock.lock();
     try {
-      ReservationId resId =
-          ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
-              resCounter.incrementAndGet());
+      ReservationId resId = ReservationId.newInstance(
+          ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet());
       LOG.info("Allocated new reservationId: " + resId);
       return resId;
     } finally {
@@ -390,8 +397,11 @@ public abstract class AbstractReservationSystem extends AbstractService
    * Get the default reservation system corresponding to the scheduler
    * 
    * @param scheduler the scheduler for which the reservation system is required
+   *
+   * @return the {@link ReservationSystem} based on the configured scheduler
    */
-  public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
+  public static String getDefaultReservationSystem(
+      ResourceScheduler scheduler) {
     if (scheduler instanceof CapacityScheduler) {
       return CapacityReservationSystem.class.getName();
     } else if (scheduler instanceof FairScheduler) {
@@ -409,12 +419,11 @@ public abstract class AbstractReservationSystem extends AbstractService
     Resource maxAllocation = getMaxAllocation();
     ResourceCalculator rescCalc = getResourceCalculator();
     Resource totCap = getPlanQueueCapacity(planQueueName);
-    Plan plan =
-        new InMemoryPlan(getRootQueueMetrics(), adPolicy,
-            getAgent(planQueuePath), totCap, planStepSize, rescCalc,
-            minAllocation, maxAllocation, planQueueName,
-            getReplanner(planQueuePath), getReservationSchedulerConfiguration()
-            .getMoveOnExpiry(planQueuePath), rmContext);
+    Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
+        getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
+        maxAllocation, planQueueName, getReplanner(planQueuePath),
+        getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath),
+        maxPeriodicity, rmContext);
     LOG.info("Initialized plan {} based on reservable queue {}",
         plan.toString(), planQueueName);
     return plan;
@@ -477,8 +486,8 @@ public abstract class AbstractReservationSystem extends AbstractService
       Class<?> admissionPolicyClazz =
           conf.getClassByName(admissionPolicyClassName);
       if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
-        return (SharingPolicy) ReflectionUtils.newInstance(
-            admissionPolicyClazz, conf);
+        return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz,
+            conf);
       } else {
         throw new YarnRuntimeException("Class: " + admissionPolicyClassName
             + " not instance of " + SharingPolicy.class.getCanonicalName());
@@ -493,8 +502,7 @@ public abstract class AbstractReservationSystem extends AbstractService
     return this.reservationsACLsManager;
   }
 
-  protected abstract ReservationSchedulerConfiguration
-      getReservationSchedulerConfiguration();
+  protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration();
 
   protected abstract String getPlanQueuePath(String planQueueName);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
index 90357e3..9b6a0b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
@@ -18,6 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,24 +41,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(AbstractSchedulerPlanFollower.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class);
 
   protected Collection<Plan> plans = new ArrayList<Plan>();
   protected YarnScheduler scheduler;
   protected Clock clock;
 
   @Override
-  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+  public void init(Clock clock, ResourceScheduler sched,
+      Collection<Plan> plans) {
     this.clock = clock;
     this.scheduler = sched;
     this.plans.addAll(plans);
@@ -71,7 +72,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
 
   @Override
   public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
-     String planQueueName = plan.getQueueName();
+    String planQueueName = plan.getQueueName();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
     }
@@ -82,12 +83,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
       now += step - (now % step);
     }
     Queue planQueue = getPlanQueue(planQueueName);
-    if (planQueue == null) return;
+    if (planQueue == null) {
+      return;
+    }
 
     // first we publish to the plan the current availability of resources
     Resource clusterResources = scheduler.getClusterResource();
-    Resource planResources = getPlanResources(plan, planQueue,
-        clusterResources);
+    Resource planResources =
+        getPlanResources(plan, planQueue, clusterResources);
     Set<ReservationAllocation> currentReservations =
         plan.getReservationsAtTime(now);
     Set<String> curReservationNames = new HashSet<String>();
@@ -95,12 +98,11 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     int numRes = getReservedResources(now, currentReservations,
         curReservationNames, reservedResources);
     // create the default reservation queue if it doesnt exist
-    String defReservationId = getReservationIdFromQueueName(planQueueName) +
-        ReservationConstants.DEFAULT_QUEUE_SUFFIX;
-    String defReservationQueue = getReservationQueueName(planQueueName,
-        defReservationId);
-    createDefaultReservationQueue(planQueueName, planQueue,
-        defReservationId);
+    String defReservationId = getReservationIdFromQueueName(planQueueName)
+        + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+    String defReservationQueue =
+        getReservationQueueName(planQueueName, defReservationId);
+    createDefaultReservationQueue(planQueueName, planQueue, defReservationId);
     curReservationNames.add(defReservationId);
     // if the resources dedicated to this plan has shrunk invoke replanner
     boolean shouldResize = false;
@@ -149,10 +151,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
       // sort allocations from the one giving up the most resources, to the
       // one asking for the most avoid order-of-operation errors that
       // temporarily violate 100% capacity bound
-      List<ReservationAllocation> sortedAllocations =
-          sortByDelta(
-              new ArrayList<ReservationAllocation>(currentReservations), now,
-              plan);
+      List<ReservationAllocation> sortedAllocations = sortByDelta(
+          new ArrayList<ReservationAllocation>(currentReservations), now, plan);
       for (ReservationAllocation res : sortedAllocations) {
         String currResId = res.getReservationId().toString();
         if (curReservationNames.contains(currResId)) {
@@ -163,10 +163,9 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
         if (planResources.getMemorySize() > 0
             && planResources.getVirtualCores() > 0) {
           if (shouldResize) {
-            capToAssign =
-                calculateReservationToPlanProportion(
-                    plan.getResourceCalculator(), planResources,
-                    reservedResources, capToAssign);
+            capToAssign = calculateReservationToPlanProportion(
+                plan.getResourceCalculator(), planResources, reservedResources,
+                capToAssign);
           }
           targetCapacity =
               calculateReservationToPlanRatio(plan.getResourceCalculator(),
@@ -185,7 +184,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
           maxCapacity = targetCapacity;
         }
         try {
-          setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity);
+          setQueueEntitlement(planQueueName, currResId, targetCapacity,
+              maxCapacity);
         } catch (YarnException e) {
           LOG.warn("Exception while trying to size reservation for plan: {}",
               currResId, planQueueName, e);
@@ -196,9 +196,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     // compute the default queue capacity
     float defQCap = 1.0f - totalAssignedCapacity;
     if (LOG.isDebugEnabled()) {
-      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
-          + "currReservation: {} default-queue capacity: {}", planResources,
-          numRes, defQCap);
+      LOG.debug(
+          "PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+              + "currReservation: {} default-queue capacity: {}",
+          planResources, numRes, defQCap);
     }
     // set the default queue to eat-up all remaining capacity
     try {
@@ -225,12 +226,11 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   }
 
   protected void setQueueEntitlement(String planQueueName, String currResId,
-      float targetCapacity,
-      float maxCapacity) throws YarnException {
-    String reservationQueueName = getReservationQueueName(planQueueName,
-        currResId);
-    scheduler.setEntitlement(reservationQueueName, new QueueEntitlement(
-        targetCapacity, maxCapacity));
+      float targetCapacity, float maxCapacity) throws YarnException {
+    String reservationQueueName =
+        getReservationQueueName(planQueueName, currResId);
+    scheduler.setEntitlement(reservationQueueName,
+        new QueueEntitlement(targetCapacity, maxCapacity));
   }
 
   // Schedulers have different ways of naming queues. See YARN-2773
@@ -244,14 +244,21 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
    * Then move all apps in the set of queues to the parent plan queue's default
    * reservation queue if move is enabled. Finally cleanups the queue by killing
    * any apps (if move is disabled or move failed) and removing the queue
+   *
+   * @param planQueueName the name of {@code PlanQueue}
+   * @param shouldMove flag to indicate if any running apps should be moved or
+   *          killed
+   * @param toRemove the remnant apps to clean up
+   * @param defReservationQueue the default {@code ReservationQueue} of the
+   *          {@link Plan}
    */
-  protected void cleanupExpiredQueues(String planQueueName,
-      boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
+  protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove,
+      Set<String> toRemove, String defReservationQueue) {
     for (String expiredReservationId : toRemove) {
       try {
         // reduce entitlement to 0
-        String expiredReservation = getReservationQueueName(planQueueName,
-            expiredReservationId);
+        String expiredReservation =
+            getReservationQueueName(planQueueName, expiredReservationId);
         setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
         if (shouldMove) {
           moveAppsInQueueSync(expiredReservation, defReservationQueue);
@@ -275,7 +282,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
    * reservation queue in a synchronous fashion
    */
   private void moveAppsInQueueSync(String expiredReservation,
-                                   String defReservationQueue) {
+      String defReservationQueue) {
     List<ApplicationAttemptId> activeApps =
         scheduler.getAppsInQueue(expiredReservation);
     if (activeApps.isEmpty()) {
@@ -287,16 +294,16 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
         scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
       } catch (YarnException e) {
         LOG.warn(
-            "Encountered unexpected error during migration of application: {}" +
-                " from reservation: {}",
+            "Encountered unexpected error during migration of application: {}"
+                + " from reservation: {}",
             app, expiredReservation, e);
       }
     }
   }
 
-  protected int getReservedResources(long now, Set<ReservationAllocation>
-      currentReservations, Set<String> curReservationNames,
-                                     Resource reservedResources) {
+  protected int getReservedResources(long now,
+      Set<ReservationAllocation> currentReservations,
+      Set<String> curReservationNames, Resource reservedResources) {
     int numRes = 0;
     if (currentReservations != null) {
       numRes = currentReservations.size();
@@ -312,23 +319,30 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
    * Sort in the order from the least new amount of resources asked (likely
    * negative) to the highest. This prevents "order-of-operation" errors related
    * to exceeding 100% capacity temporarily.
+   *
+   * @param currentReservations the currently active reservations
+   * @param now the current time
+   * @param plan the {@link Plan} that is being considered
+   *
+   * @return the sorted list of {@link ReservationAllocation}s
    */
   protected List<ReservationAllocation> sortByDelta(
       List<ReservationAllocation> currentReservations, long now, Plan plan) {
-    Collections.sort(currentReservations, new ReservationAllocationComparator(
-        now, this, plan));
+    Collections.sort(currentReservations,
+        new ReservationAllocationComparator(now, this, plan));
     return currentReservations;
   }
 
   /**
-   * Get queue associated with reservable queue named
-   * @param planQueueName Name of the reservable queue
+   * Get queue associated with reservable queue named.
+   *
+   * @param planQueueName name of the reservable queue
    * @return queue associated with the reservable queue
    */
   protected abstract Queue getPlanQueue(String planQueueName);
 
   /**
-   * Resizes reservations based on currently available resources
+   * Resizes reservations based on currently available resources.
    */
   private Resource calculateReservationToPlanProportion(
       ResourceCalculator rescCalculator, Resource availablePlanResources,
@@ -338,7 +352,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   }
 
   /**
-   * Calculates ratio of reservationResources to planResources
+   * Calculates ratio of reservationResources to planResources.
    */
   private float calculateReservationToPlanRatio(
       ResourceCalculator rescCalculator, Resource clusterResources,
@@ -348,7 +362,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   }
 
   /**
-   * Check if plan resources are less than expected reservation resources
+   * Check if plan resources are less than expected reservation resources.
    */
   private boolean arePlanResourcesLessThanReservations(
       ResourceCalculator rescCalculator, Resource clusterResources,
@@ -358,38 +372,56 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   }
 
   /**
-   * Get a list of reservation queues for this planQueue
+   * Get a list of reservation queues for this planQueue.
+   *
+   * @param planQueue the queue for the current {@link Plan}
+   *
+   * @return the queues corresponding to the reservations
    */
   protected abstract List<? extends Queue> getChildReservationQueues(
       Queue planQueue);
 
   /**
-   * Add a new reservation queue for reservation currResId for this planQueue
+   * Add a new reservation queue for reservation currResId for this planQueue.
    */
-  protected abstract void addReservationQueue(
-      String planQueueName, Queue queue, String currResId);
+  protected abstract void addReservationQueue(String planQueueName, Queue queue,
+      String currResId);
 
   /**
-   * Creates the default reservation queue for use when no reservation is
-   * used for applications submitted to this planQueue
+   * Creates the default reservation queue for use when no reservation is used
+   * for applications submitted to this planQueue.
+   *
+   * @param planQueueName name of the reservable queue
+   * @param queue the queue for the current {@link Plan}
+   * @param defReservationQueue name of the default {@code ReservationQueue}
    */
-  protected abstract void createDefaultReservationQueue(
-      String planQueueName, Queue queue, String defReservationQueue);
+  protected abstract void createDefaultReservationQueue(String planQueueName,
+      Queue queue, String defReservationQueue);
 
   /**
-   * Get plan resources for this planQueue
+   * Get plan resources for this planQueue.
+   *
+   * @param plan the current {@link Plan} being considered
+   * @param clusterResources the resources available in the cluster
+   *
+   * @return the resources allocated to the specified {@link Plan}
    */
-  protected abstract Resource getPlanResources(
-      Plan plan, Queue queue, Resource clusterResources);
+  protected abstract Resource getPlanResources(Plan plan, Queue queue,
+      Resource clusterResources);
 
   /**
    * Get reservation queue resources if it exists otherwise return null.
+   *
+   * @param plan the current {@link Plan} being considered
+   * @param reservationId the identifier of the reservation
+   *
+   * @return the resources allocated to the specified reservation
    */
   protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
       ReservationId reservationId);
 
-  private static class ReservationAllocationComparator implements
-      Comparator<ReservationAllocation> {
+  private static class ReservationAllocationComparator
+      implements Comparator<ReservationAllocation> {
     AbstractSchedulerPlanFollower planFollower;
     long now;
     Plan plan;
@@ -404,14 +436,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     private Resource getUnallocatedReservedResources(
         ReservationAllocation reservation) {
       Resource resResource;
-      Resource reservationResource = planFollower
-          .getReservationQueueResourceIfExists
-              (plan, reservation.getReservationId());
+      Resource reservationResource =
+          planFollower.getReservationQueueResourceIfExists(plan,
+              reservation.getReservationId());
       if (reservationResource != null) {
-        resResource =
-            Resources.subtract(
-                reservation.getResourcesAtTime(now),
-                reservationResource);
+        resResource = Resources.subtract(reservation.getResourcesAtTime(now),
+            reservationResource);
       } else {
         resResource = reservation.getResourcesAtTime(now);
       }
@@ -428,4 +458,3 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 783fd09..9eb1820 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,9 +33,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -64,9 +65,14 @@ public class InMemoryPlan implements Plan {
 
   private RLESparseResourceAllocation rleSparseVector;
 
+  private PeriodicRLESparseResourceAllocation periodicRle;
+
   private Map<String, RLESparseResourceAllocation> userResourceAlloc =
       new HashMap<String, RLESparseResourceAllocation>();
 
+  private Map<String, RLESparseResourceAllocation> userPeriodicResourceAlloc =
+      new HashMap<String, RLESparseResourceAllocation>();
+
   private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
       new HashMap<String, RLESparseResourceAllocation>();
 
@@ -96,15 +102,27 @@ public class InMemoryPlan implements Plan {
       String queueName, Planner replanner, boolean getMoveOnExpiry,
       RMContext rmContext) {
     this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
-        maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext,
-        new UTCClock());
+        maxAlloc, queueName, replanner, getMoveOnExpiry,
+        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
+        rmContext);
   }
 
   public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
       String queueName, Planner replanner, boolean getMoveOnExpiry,
-      RMContext rmContext, Clock clock) {
+      long maxPeriodicty, RMContext rmContext) {
+    this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
+        maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicty,
+        rmContext, new UTCClock());
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+      ReservationAgent agent, Resource totalCapacity, long step,
+      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+      String queueName, Planner replanner, boolean getMoveOnExpiry,
+      long maxPeriodicty, RMContext rmContext, Clock clock) {
     this.queueMetrics = queueMetrics;
     this.policy = policy;
     this.agent = agent;
@@ -114,6 +132,8 @@ public class InMemoryPlan implements Plan {
     this.minAlloc = minAlloc;
     this.maxAlloc = maxAlloc;
     this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
+    this.periodicRle =
+        new PeriodicRLESparseResourceAllocation(resCalc, maxPeriodicty);
     this.queueName = queueName;
     this.replanner = replanner;
     this.getMoveOnExpiry = getMoveOnExpiry;
@@ -126,6 +146,39 @@ public class InMemoryPlan implements Plan {
     return queueMetrics;
   }
 
+  private RLESparseResourceAllocation getUserRLEResourceAllocation(String user,
+      long period) {
+    RLESparseResourceAllocation resAlloc = null;
+    if (period > 0) {
+      if (userPeriodicResourceAlloc.containsKey(user)) {
+        resAlloc = userPeriodicResourceAlloc.get(user);
+      } else {
+        resAlloc = new PeriodicRLESparseResourceAllocation(resCalc,
+            periodicRle.getTimePeriod());
+        userPeriodicResourceAlloc.put(user, resAlloc);
+      }
+    } else {
+      if (userResourceAlloc.containsKey(user)) {
+        resAlloc = userResourceAlloc.get(user);
+      } else {
+        resAlloc = new RLESparseResourceAllocation(resCalc);
+        userResourceAlloc.put(user, resAlloc);
+      }
+    }
+    return resAlloc;
+  }
+
+  private void gcUserRLEResourceAllocation(String user, long period) {
+    if (period > 0) {
+      if (userPeriodicResourceAlloc.get(user).isEmpty()) {
+        userPeriodicResourceAlloc.remove(user);
+      }
+    } else {
+      if (userResourceAlloc.get(user).isEmpty()) {
+        userResourceAlloc.remove(user);
+      }
+    }
+  }
 
   private void incrementAllocation(ReservationAllocation reservation) {
     assert (readWriteLock.isWriteLockedByCurrentThread());
@@ -133,11 +186,10 @@ public class InMemoryPlan implements Plan {
         reservation.getAllocationRequests();
     // check if we have encountered the user earlier and if not add an entry
     String user = reservation.getUser();
-    RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
-    if (resAlloc == null) {
-      resAlloc = new RLESparseResourceAllocation(resCalc);
-      userResourceAlloc.put(user, resAlloc);
-    }
+    long period = reservation.getPeriodicity();
+    RLESparseResourceAllocation resAlloc =
+        getUserRLEResourceAllocation(user, period);
+
     RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
     if (resCount == null) {
       resCount = new RLESparseResourceAllocation(resCalc);
@@ -149,14 +201,43 @@ public class InMemoryPlan implements Plan {
 
     for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
         .entrySet()) {
-      resAlloc.addInterval(r.getKey(), r.getValue());
-      rleSparseVector.addInterval(r.getKey(), r.getValue());
-      if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
-          ZERO_RESOURCE)) {
-        earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
-        latestActive = Math.max(latestActive, r.getKey().getEndTime());
+
+      if (period > 0L) {
+        for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
+
+          long rStart = r.getKey().getStartTime() + i * period;
+          long rEnd = r.getKey().getEndTime() + i * period;
+
+          // handle wrap-around
+          if (rEnd > periodicRle.getTimePeriod()) {
+            long diff = rEnd - periodicRle.getTimePeriod();
+            rEnd = periodicRle.getTimePeriod();
+            ReservationInterval newInterval = new ReservationInterval(0, diff);
+            periodicRle.addInterval(newInterval, r.getValue());
+            resAlloc.addInterval(newInterval, r.getValue());
+          }
+
+          ReservationInterval newInterval =
+              new ReservationInterval(rStart, rEnd);
+          periodicRle.addInterval(newInterval, r.getValue());
+          resAlloc.addInterval(newInterval, r.getValue());
+        }
+
+      } else {
+        rleSparseVector.addInterval(r.getKey(), r.getValue());
+        resAlloc.addInterval(r.getKey(), r.getValue());
+        if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+            ZERO_RESOURCE)) {
+          earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+          latestActive = Math.max(latestActive, r.getKey().getEndTime());
+        }
       }
     }
+    // periodic reservations are active from start time and good till cancelled
+    if (period > 0L) {
+      earliestActive = reservation.getStartTime();
+      latestActive = Long.MAX_VALUE;
+    }
     resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
         Resource.newInstance(1, 1));
   }
@@ -166,27 +247,55 @@ public class InMemoryPlan implements Plan {
     Map<ReservationInterval, Resource> allocationRequests =
         reservation.getAllocationRequests();
     String user = reservation.getUser();
-    RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+    long period = reservation.getPeriodicity();
+    RLESparseResourceAllocation resAlloc =
+        getUserRLEResourceAllocation(user, period);
 
     long earliestActive = Long.MAX_VALUE;
     long latestActive = Long.MIN_VALUE;
     for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
         .entrySet()) {
-      resAlloc.removeInterval(r.getKey(), r.getValue());
-      rleSparseVector.removeInterval(r.getKey(), r.getValue());
-      if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
-          ZERO_RESOURCE)) {
-        earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
-        latestActive = Math.max(latestActive, r.getKey().getEndTime());
+      if (period > 0L) {
+        for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
+
+          long rStart = r.getKey().getStartTime() + i * period;
+          long rEnd = r.getKey().getEndTime() + i * period;
+
+          // handle wrap-around
+          if (rEnd > periodicRle.getTimePeriod()) {
+            long diff = rEnd - periodicRle.getTimePeriod();
+            rEnd = periodicRle.getTimePeriod();
+            ReservationInterval newInterval = new ReservationInterval(0, diff);
+            periodicRle.removeInterval(newInterval, r.getValue());
+            resAlloc.removeInterval(newInterval, r.getValue());
+          }
+
+          ReservationInterval newInterval =
+              new ReservationInterval(rStart, rEnd);
+          periodicRle.removeInterval(newInterval, r.getValue());
+          resAlloc.removeInterval(newInterval, r.getValue());
+        }
+      } else {
+        rleSparseVector.removeInterval(r.getKey(), r.getValue());
+        resAlloc.removeInterval(r.getKey(), r.getValue());
+        if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+            ZERO_RESOURCE)) {
+          earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+          latestActive = Math.max(latestActive, r.getKey().getEndTime());
+        }
       }
     }
-    if (resAlloc.isEmpty()) {
-      userResourceAlloc.remove(user);
-    }
+    gcUserRLEResourceAllocation(user, period);
 
     RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
-    resCount.removeInterval(new ReservationInterval(earliestActive,
-        latestActive), Resource.newInstance(1, 1));
+    // periodic reservations are active from start time and good till cancelled
+    if (period > 0L) {
+      earliestActive = reservation.getStartTime();
+      latestActive = Long.MAX_VALUE;
+    }
+    resCount.removeInterval(
+        new ReservationInterval(earliestActive, latestActive),
+        Resource.newInstance(1, 1));
     if (resCount.isEmpty()) {
       userActiveReservationCount.remove(user);
     }
@@ -198,9 +307,9 @@ public class InMemoryPlan implements Plan {
       if (currentReservations != null) {
         Set<ReservationAllocation> flattenedReservations =
             new TreeSet<ReservationAllocation>();
-        for (Set<InMemoryReservationAllocation> reservationEntries :
-            currentReservations.values()) {
-          flattenedReservations.addAll(reservationEntries);
+        for (Set<InMemoryReservationAllocation> res : currentReservations
+            .values()) {
+          flattenedReservations.addAll(res);
         }
         return flattenedReservations;
       } else {
@@ -218,19 +327,16 @@ public class InMemoryPlan implements Plan {
     InMemoryReservationAllocation inMemReservation =
         (InMemoryReservationAllocation) reservation;
     if (inMemReservation.getUser() == null) {
-      String errMsg =
-          "The specified Reservation with ID "
-              + inMemReservation.getReservationId()
-              + " is not mapped to any user";
+      String errMsg = "The specified Reservation with ID "
+          + inMemReservation.getReservationId() + " is not mapped to any user";
       LOG.error(errMsg);
       throw new IllegalArgumentException(errMsg);
     }
     writeLock.lock();
     try {
       if (reservationTable.containsKey(inMemReservation.getReservationId())) {
-        String errMsg =
-            "The specified Reservation with ID "
-                + inMemReservation.getReservationId() + " already exists";
+        String errMsg = "The specified Reservation with ID "
+            + inMemReservation.getReservationId() + " already exists";
         LOG.error(errMsg);
         throw new IllegalArgumentException(errMsg);
       }
@@ -246,9 +352,8 @@ public class InMemoryPlan implements Plan {
               getQueueName(), inMemReservation.getReservationId().toString());
         }
       }
-      ReservationInterval searchInterval =
-          new ReservationInterval(inMemReservation.getStartTime(),
-              inMemReservation.getEndTime());
+      ReservationInterval searchInterval = new ReservationInterval(
+          inMemReservation.getStartTime(), inMemReservation.getEndTime());
       Set<InMemoryReservationAllocation> reservations =
           currentReservations.get(searchInterval);
       if (reservations == null) {
@@ -280,9 +385,8 @@ public class InMemoryPlan implements Plan {
       ReservationId resId = reservation.getReservationId();
       ReservationAllocation currReservation = getReservationById(resId);
       if (currReservation == null) {
-        String errMsg =
-            "The specified Reservation with ID " + resId
-                + " does not exist in the plan";
+        String errMsg = "The specified Reservation with ID " + resId
+            + " does not exist in the plan";
         LOG.error(errMsg);
         throw new IllegalArgumentException(errMsg);
       }
@@ -318,9 +422,8 @@ public class InMemoryPlan implements Plan {
 
   private boolean removeReservation(ReservationAllocation reservation) {
     assert (readWriteLock.isWriteLockedByCurrentThread());
-    ReservationInterval searchInterval =
-        new ReservationInterval(reservation.getStartTime(),
-            reservation.getEndTime());
+    ReservationInterval searchInterval = new ReservationInterval(
+        reservation.getStartTime(), reservation.getEndTime());
     Set<InMemoryReservationAllocation> reservations =
         currentReservations.get(searchInterval);
     if (reservations != null) {
@@ -337,16 +440,15 @@ public class InMemoryPlan implements Plan {
         currentReservations.remove(searchInterval);
       }
     } else {
-      String errMsg =
-          "The specified Reservation with ID " + reservation.getReservationId()
-              + " does not exist in the plan";
+      String errMsg = "The specified Reservation with ID "
+          + reservation.getReservationId() + " does not exist in the plan";
       LOG.error(errMsg);
       throw new IllegalArgumentException(errMsg);
     }
     reservationTable.remove(reservation.getReservationId());
     decrementAllocation(reservation);
     LOG.info("Sucessfully deleted reservation: {} in plan.",
-            reservation.getReservationId());
+        reservation.getReservationId());
     return true;
   }
 
@@ -356,9 +458,8 @@ public class InMemoryPlan implements Plan {
     try {
       ReservationAllocation reservation = getReservationById(reservationID);
       if (reservation == null) {
-        String errMsg =
-            "The specified Reservation with ID " + reservationID
-                + " does not exist in the plan";
+        String errMsg = "The specified Reservation with ID " + reservationID
+            + " does not exist in the plan";
         LOG.error(errMsg);
         throw new IllegalArgumentException(errMsg);
       }
@@ -453,66 +554,90 @@ public class InMemoryPlan implements Plan {
       long start, long end) {
     readLock.lock();
     try {
+      // merge periodic and non-periodic allocations
       RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+      RLESparseResourceAllocation userPeriodicResAlloc =
+          userPeriodicResourceAlloc.get(user);
 
+      if (userResAlloc != null && userPeriodicResAlloc != null) {
+        return RLESparseResourceAllocation.merge(resCalc, totalCapacity,
+            userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end);
+      }
       if (userResAlloc != null) {
         return userResAlloc.getRangeOverlapping(start, end);
-      } else {
-        return new RLESparseResourceAllocation(resCalc);
       }
+      if (userPeriodicResAlloc != null) {
+        return userPeriodicResAlloc.getRangeOverlapping(start, end);
+      }
+    } catch (PlanningException e) {
+      LOG.warn("Exception while trying to merge periodic"
+          + " and non-periodic user allocations: {}", e.getMessage(), e);
     } finally {
       readLock.unlock();
     }
+    return new RLESparseResourceAllocation(resCalc);
   }
 
   @Override
   public Resource getTotalCommittedResources(long t) {
     readLock.lock();
     try {
-      return rleSparseVector.getCapacityAtTime(t);
+      return Resources.add(rleSparseVector.getCapacityAtTime(t),
+          periodicRle.getCapacityAtTime(t));
     } finally {
       readLock.unlock();
     }
   }
 
   @Override
-  public Set<ReservationAllocation> getReservations(ReservationId
-                    reservationID, ReservationInterval interval) {
+  public Set<ReservationAllocation> getReservations(ReservationId reservationID,
+      ReservationInterval interval) {
     return getReservations(reservationID, interval, null);
   }
 
   @Override
-  public Set<ReservationAllocation> getReservations(ReservationId
-                    reservationID, ReservationInterval interval, String user) {
+  public Set<ReservationAllocation> getReservations(ReservationId reservationID,
+      ReservationInterval interval, String user) {
     if (reservationID != null) {
       ReservationAllocation allocation = getReservationById(reservationID);
-      if (allocation == null){
+      if (allocation == null) {
         return Collections.emptySet();
       }
       return Collections.singleton(allocation);
     }
 
-    long startTime = interval == null? 0 : interval.getStartTime();
-    long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime();
+    long startTime = interval == null ? 0 : interval.getStartTime();
+    long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime();
 
     ReservationInterval searchInterval =
-            new ReservationInterval(endTime, Long.MAX_VALUE);
+        new ReservationInterval(endTime, Long.MAX_VALUE);
     readLock.lock();
     try {
-      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>>
-            reservations = currentReservations.headMap(searchInterval, true);
-      if (!reservations.isEmpty()) {
-        Set<ReservationAllocation> flattenedReservations =
-                new HashSet<>();
-        for (Set<InMemoryReservationAllocation> reservationEntries :
-                reservations.values()) {
-          for (InMemoryReservationAllocation res : reservationEntries) {
-            if (res.getEndTime() > startTime) {
-              if (user != null && !user.isEmpty()
-                      && !res.getUser().equals(user)) {
-                continue;
+      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> res =
+          currentReservations.headMap(searchInterval, true);
+      if (!res.isEmpty()) {
+        Set<ReservationAllocation> flattenedReservations = new HashSet<>();
+        for (Set<InMemoryReservationAllocation> resEntries : res.values()) {
+          for (InMemoryReservationAllocation reservation : resEntries) {
+            // validate user
+            if (user != null && !user.isEmpty()
+                && !reservation.getUser().equals(user)) {
+              continue;
+            }
+            // handle periodic reservations
+            long period = reservation.getPeriodicity();
+            if (period > 0) {
+              long t = endTime % period;
+              // check for both contained and wrap-around reservations
+              if ((t - startTime) * (t - endTime)
+                  * (startTime - endTime) >= 0) {
+                flattenedReservations.add(reservation);
+              }
+            } else {
+              // check for non-periodic reservations
+              if (reservation.getEndTime() > startTime) {
+                flattenedReservations.add(reservation);
               }
-              flattenedReservations.add(res);
             }
           }
         }
@@ -550,36 +675,82 @@ public class InMemoryPlan implements Plan {
 
   @Override
   public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
-      ReservationId oldId, long start, long end) throws PlanningException {
+      ReservationId oldId, long start, long end, long period)
+      throws PlanningException {
     readLock.lock();
     try {
-      // create RLE of totCapacity
-      TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
-      totAvailable.put(start, Resources.clone(totalCapacity));
-      RLESparseResourceAllocation totRLEAvail =
-          new RLESparseResourceAllocation(totAvailable, resCalc);
-
-      // subtract used from available
-      RLESparseResourceAllocation netAvailable;
-
-      netAvailable =
-          RLESparseResourceAllocation.merge(resCalc,
-              Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
-              RLEOperator.subtractTestNonNegative, start, end);
-
-      // add back in old reservation used resources if any
-      ReservationAllocation old = reservationTable.get(oldId);
-      if (old != null) {
-        netAvailable =
-            RLESparseResourceAllocation.merge(resCalc,
-                Resources.clone(totalCapacity), netAvailable,
-                old.getResourcesOverTime(), RLEOperator.add, start, end);
+
+      // for non-periodic return simple available resources
+      if (period == 0) {
+
+        // create RLE of totCapacity
+        TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
+        totAvailable.put(start, Resources.clone(totalCapacity));
+        RLESparseResourceAllocation totRLEAvail =
+            new RLESparseResourceAllocation(totAvailable, resCalc);
+
+        // subtract used from available
+        RLESparseResourceAllocation netAvailable;
+
+        netAvailable = RLESparseResourceAllocation.merge(resCalc,
+            Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
+            RLEOperator.subtractTestNonNegative, start, end);
+
+        // remove periodic component
+        netAvailable = RLESparseResourceAllocation.merge(resCalc,
+            Resources.clone(totalCapacity), netAvailable, periodicRle,
+            RLEOperator.subtractTestNonNegative, start, end);
+
+        // add back in old reservation used resources if any
+        ReservationAllocation old = reservationTable.get(oldId);
+        if (old != null) {
+
+          RLESparseResourceAllocation addBackPrevious =
+              old.getResourcesOverTime(start, end);
+          netAvailable = RLESparseResourceAllocation.merge(resCalc,
+              Resources.clone(totalCapacity), netAvailable, addBackPrevious,
+              RLEOperator.add, start, end);
+        }
+        // lower it if this is needed by the sharing policy
+        netAvailable = getSharingPolicy().availableResources(netAvailable, this,
+            user, oldId, start, end);
+        return netAvailable;
+      } else {
+
+        if (periodicRle.getTimePeriod() % period != 0) {
+          throw new PlanningException("The reservation periodicity (" + period
+              + ") must be" + "an exact divider of the system maxPeriod ("
+              + periodicRle.getTimePeriod() + ")");
+        }
+
+        // find the minimum resources available among all the instances that fit
+        // in the LCM
+        long numInstInLCM = periodicRle.getTimePeriod() / period;
+
+        RLESparseResourceAllocation minOverLCM =
+            getAvailableResourceOverTime(user, oldId, start, end, 0);
+        for (int i = 1; i < numInstInLCM; i++) {
+
+          long rStart = start + i * period;
+          long rEnd = end + i * period;
+
+          // recursive invocation of non-periodic range (to pick raw-info)
+          RLESparseResourceAllocation snapShot =
+              getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0);
+
+          // time-align on start
+          snapShot.shift(-(i * period));
+
+          // pick the minimum amount of resources in each time interval
+          minOverLCM =
+              RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(),
+                  minOverLCM, snapShot, RLEOperator.min, start, end);
+
+        }
+
+        return minOverLCM;
+
       }
-      // lower it if this is needed by the sharing policy
-      netAvailable =
-          getSharingPolicy().availableResources(netAvailable, this, user,
-              oldId, start, end);
-      return netAvailable;
     } finally {
       readLock.unlock();
     }
@@ -637,7 +808,7 @@ public class InMemoryPlan implements Plan {
   public String toCumulativeString() {
     readLock.lock();
     try {
-      return rleSparseVector.toString();
+      return rleSparseVector.toString() + "\n" + periodicRle.toString();
     } finally {
       readLock.unlock();
     }
@@ -689,11 +860,18 @@ public class InMemoryPlan implements Plan {
   }
 
   @Override
-  public RLESparseResourceAllocation getCumulativeLoadOverTime(
-      long start, long end) {
+  public RLESparseResourceAllocation getCumulativeLoadOverTime(long start,
+      long end) throws PlanningException {
     readLock.lock();
     try {
-      return rleSparseVector.getRangeOverlapping(start, end);
+
+      RLESparseResourceAllocation ret =
+          rleSparseVector.getRangeOverlapping(start, end);
+      ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret,
+          periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start,
+          end);
+
+      return ret;
     } finally {
       readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index 69fd43f..00c8e44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -42,6 +42,7 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
   private final Map<ReservationInterval, Resource> allocationRequests;
   private boolean hasGang = false;
   private long acceptedAt = -1;
+  private long periodicity = 0;
 
   private RLESparseResourceAllocation resourcesOverTime;
 
@@ -67,9 +68,16 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
     this.allocationRequests = allocations;
     this.planName = planName;
     this.hasGang = hasGang;
-    resourcesOverTime = new RLESparseResourceAllocation(calculator);
-    for (Map.Entry<ReservationInterval, Resource> r : allocations
-        .entrySet()) {
+    if (contract != null && contract.getRecurrenceExpression() != null) {
+      this.periodicity = Long.parseLong(contract.getRecurrenceExpression());
+    }
+    if (periodicity > 0) {
+      resourcesOverTime =
+          new PeriodicRLESparseResourceAllocation(calculator, periodicity);
+    } else {
+      resourcesOverTime = new RLESparseResourceAllocation(calculator);
+    }
+    for (Map.Entry<ReservationInterval, Resource> r : allocations.entrySet()) {
       resourcesOverTime.addInterval(r.getKey(), r.getValue());
     }
   }
@@ -133,17 +141,33 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
   }
 
   @Override
-  public RLESparseResourceAllocation getResourcesOverTime(){
+  public RLESparseResourceAllocation getResourcesOverTime() {
     return resourcesOverTime;
   }
 
   @Override
+  public RLESparseResourceAllocation getResourcesOverTime(long start,
+      long end) {
+    return resourcesOverTime.getRangeOverlapping(start, end);
+  }
+
+  @Override
+  public long getPeriodicity() {
+    return periodicity;
+  }
+
+  @Override
+  public void setPeriodicity(long period) {
+    periodicity = period;
+  }
+
+  @Override
   public String toString() {
     StringBuilder sBuf = new StringBuilder();
     sBuf.append(getReservationId()).append(" user:").append(getUser())
         .append(" startTime: ").append(getStartTime()).append(" endTime: ")
-        .append(getEndTime()).append(" alloc:\n[")
-        .append(resourcesOverTime.toString()).append("] ");
+        .append(getEndTime()).append(" Periodiciy: ").append(periodicity)
+        .append(" alloc:\n[").append(resourcesOverTime.toString()).append("] ");
     return sBuf.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index 55f1d00..49d4702 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -40,7 +40,7 @@ public class NoOverCommitPolicy implements SharingPolicy {
 
     RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
         reservation.getUser(), reservation.getReservationId(),
-        reservation.getStartTime(), reservation.getEndTime());
+        reservation.getStartTime(), reservation.getEndTime(), 0);
 
     // test the reservation does not exceed what is available
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
index 8e3be8b3..7bc44f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
@@ -18,47 +18,94 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
- * This data structure stores a periodic RLESparseResourceAllocation.
+ * This data structure stores a periodic {@link RLESparseResourceAllocation}.
  * Default period is 1 day (86400000ms).
  */
-public class PeriodicRLESparseResourceAllocation extends
-    RLESparseResourceAllocation {
+public class PeriodicRLESparseResourceAllocation
+    extends RLESparseResourceAllocation {
 
   // Log
-  private static final Logger LOG = LoggerFactory
-      .getLogger(PeriodicRLESparseResourceAllocation.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PeriodicRLESparseResourceAllocation.class);
 
   private long timePeriod;
 
   /**
    * Constructor.
    *
-   * @param rleVector {@link RLESparseResourceAllocation} with the run-length
-              encoded data.
+   * @param resourceCalculator {@link ResourceCalculator} the resource
+   *          calculator to use.
    * @param timePeriod Time period in milliseconds.
    */
   public PeriodicRLESparseResourceAllocation(
-      RLESparseResourceAllocation rleVector, Long timePeriod) {
-    super(rleVector.getCumulative(), rleVector.getResourceCalculator());
+      ResourceCalculator resourceCalculator, Long timePeriod) {
+    super(resourceCalculator);
     this.timePeriod = timePeriod;
   }
 
   /**
    * Constructor. Default time period set to 1 day.
    *
+   * @param resourceCalculator {@link ResourceCalculator} the resource
+   *          calculator to use..
+   */
+  public PeriodicRLESparseResourceAllocation(
+      ResourceCalculator resourceCalculator) {
+    this(resourceCalculator,
+        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
+  }
+
+  /**
+   * Constructor.
+   *
    * @param rleVector {@link RLESparseResourceAllocation} with the run-length
-              encoded data.
+   *          encoded data.
+   * @param timePeriod Time period in milliseconds.
    */
+  @VisibleForTesting
   public PeriodicRLESparseResourceAllocation(
-      RLESparseResourceAllocation rleVector) {
-    this(rleVector, 86400000L);
+      RLESparseResourceAllocation rleVector, Long timePeriod) {
+    super(rleVector.getCumulative(), rleVector.getResourceCalculator());
+    this.timePeriod = timePeriod;
+
+    // make sure the PeriodicRLE is zero-based, and handles wrap-around
+    long delta = (getEarliestStartTime() % timePeriod - getEarliestStartTime());
+    shift(delta);
+
+    List<Long> toRemove = new ArrayList<>();
+    Map<Long, Resource> toAdd = new TreeMap<>();
+
+    for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+      if (entry.getKey() > timePeriod) {
+        toRemove.add(entry.getKey());
+        if (entry.getValue() != null) {
+          toAdd.put(timePeriod, entry.getValue());
+          long prev = entry.getKey() % timePeriod;
+          toAdd.put(prev, this.getCapacityAtTime(prev));
+          toAdd.put(0L, entry.getValue());
+        }
+      }
+    }
+    for (Long l : toRemove) {
+      cumulativeCapacity.remove(l);
+    }
+    cumulativeCapacity.putAll(toAdd);
   }
 
   /**
@@ -78,24 +125,25 @@ public class PeriodicRLESparseResourceAllocation extends
    * The interval may include 0, but the end time must be strictly less than
    * timePeriod.
    *
-   * @param interval {@link ReservationInterval} to which the specified
-   *          resource is to be added.
+   * @param interval {@link ReservationInterval} to which the specified resource
+   *          is to be added.
    * @param resource {@link Resource} to be added to the interval specified.
    * @return true if addition is successful, false otherwise
    */
-  public boolean addInterval(ReservationInterval interval,
-      Resource resource) {
+  public boolean addInterval(ReservationInterval interval, Resource resource) {
     long startTime = interval.getStartTime();
     long endTime = interval.getEndTime();
+
     if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
       return super.addInterval(interval, resource);
     } else {
-      LOG.info("Cannot set capacity beyond end time: " + timePeriod);
+      LOG.info("Cannot set capacity beyond end time: " + timePeriod + " was ("
+          + interval.toString() + ")");
       return false;
     }
   }
 
-   /**
+  /**
    * Removes a resource for the specified interval.
    *
    * @param interval the {@link ReservationInterval} for which the resource is
@@ -103,14 +151,15 @@ public class PeriodicRLESparseResourceAllocation extends
    * @param resource the {@link Resource} to be removed.
    * @return true if removal is successful, false otherwise
    */
-  public boolean removeInterval(
-      ReservationInterval interval, Resource resource) {
+  public boolean removeInterval(ReservationInterval interval,
+      Resource resource) {
     long startTime = interval.getStartTime();
     long endTime = interval.getEndTime();
     // If the resource to be subtracted is less than the minimum resource in
     // the range, abort removal to avoid negative capacity.
-    if (!Resources.fitsIn(
-        resource, super.getMinimumCapacityInInterval(interval))) {
+    // TODO revesit decrementing endTime
+    if (!Resources.fitsIn(resource, getMinimumCapacityInInterval(
+        new ReservationInterval(startTime, endTime - 1)))) {
       LOG.info("Request to remove more resources than what is available");
       return false;
     }
@@ -125,17 +174,16 @@ public class PeriodicRLESparseResourceAllocation extends
   /**
    * Get maximum capacity at periodic offsets from the specified time.
    *
-   * @param tick UTC time base from which offsets are specified for finding
-   *          the maximum capacity.
-   * @param period periodic offset at which capacities are evaluted.
+   * @param tick UTC time base from which offsets are specified for finding the
+   *          maximum capacity.
+   * @param period periodic offset at which capacities are evaluated.
    * @return the maximum {@link Resource} across the specified time instants.
    * @return true if removal is successful, false otherwise
    */
   public Resource getMaximumPeriodicCapacity(long tick, long period) {
     Resource maxResource;
     if (period < timePeriod) {
-      maxResource =
-          super.getMaximumPeriodicCapacity(tick % timePeriod, period);
+      maxResource = super.getMaximumPeriodicCapacity(tick % timePeriod, period);
     } else {
       // if period is greater than the length of PeriodicRLESparseAllocation,
       // only a single value exists in this interval.
@@ -164,4 +212,30 @@ public class PeriodicRLESparseResourceAllocation extends
     return ret.toString();
   }
 
+  @Override
+  public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
+    NavigableMap<Long, Resource> unrolledMap = new TreeMap<>();
+    readLock.lock();
+    try {
+      long relativeStart = (start >= 0) ? start % timePeriod : 0;
+      NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
+      Long previous = cumulativeMap.floorKey(relativeStart);
+      previous = (previous != null) ? previous : 0;
+      for (long i = 0; i <= (end - start) / timePeriod; i++) {
+        for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
+          long curKey = e.getKey() + (i * timePeriod);
+          if (curKey >= previous && (start + curKey - relativeStart) <= end) {
+            unrolledMap.put(curKey, e.getValue());
+          }
+        }
+      }
+      RLESparseResourceAllocation rle =
+          new RLESparseResourceAllocation(unrolledMap, getResourceCalculator());
+      rle.shift(start - relativeStart);
+      return rle;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
index 504a250..9afa324 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
@@ -28,54 +28,58 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
 public interface PlanEdit extends PlanContext, PlanView {
 
   /**
-   * Add a new {@link ReservationAllocation} to the plan
+   * Add a new {@link ReservationAllocation} to the plan.
    * 
    * @param reservation the {@link ReservationAllocation} to be added to the
    *          plan
    * @param isRecovering flag to indicate if reservation is being added as part
    *          of failover or not
    * @return true if addition is successful, false otherwise
+   * @throws PlanningException if addition is unsuccessful
    */
-  public boolean addReservation(ReservationAllocation reservation,
+  boolean addReservation(ReservationAllocation reservation,
       boolean isRecovering) throws PlanningException;
 
   /**
    * Updates an existing {@link ReservationAllocation} in the plan. This is
-   * required for re-negotiation
+   * required for re-negotiation.
    * 
    * @param reservation the {@link ReservationAllocation} to be updated the plan
    * @return true if update is successful, false otherwise
+   * @throws PlanningException if update is unsuccessful
    */
-  public boolean updateReservation(ReservationAllocation reservation)
+  boolean updateReservation(ReservationAllocation reservation)
       throws PlanningException;
 
   /**
    * Delete an existing {@link ReservationAllocation} from the plan identified
    * uniquely by its {@link ReservationId}. This will generally be used for
-   * garbage collection
+   * garbage collection.
    * 
    * @param reservationID the {@link ReservationAllocation} to be deleted from
    *          the plan identified uniquely by its {@link ReservationId}
    * @return true if delete is successful, false otherwise
+   * @throws PlanningException if deletion is unsuccessful
    */
-  public boolean deleteReservation(ReservationId reservationID)
+  boolean deleteReservation(ReservationId reservationID)
       throws PlanningException;
 
   /**
    * Method invoked to garbage collect old reservations. It cleans up expired
-   * reservations that have fallen out of the sliding archival window
+   * reservations that have fallen out of the sliding archival window.
    * 
    * @param tick the current time from which the archival window is computed
+   * @throws PlanningException if archival is unsuccessful
    */
-  public void archiveCompletedReservations(long tick) throws PlanningException;
+  void archiveCompletedReservations(long tick) throws PlanningException;
 
   /**
    * Sets the overall capacity in terms of {@link Resource} assigned to this
-   * plan
+   * plan.
    * 
    * @param capacity the overall capacity in terms of {@link Resource} assigned
    *          to this plan
    */
-  public void setTotalCapacity(Resource capacity);
+  void setTotalCapacity(Resource capacity);
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 2767993..4035f68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -17,50 +17,50 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.util.Set;
+
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 
-import java.util.Set;
-
 /**
  * This interface provides a read-only view on the allocations made in this
  * plan. This methods are used for example by {@code ReservationAgent}s to
  * determine the free resources in a certain point in time, and by
  * PlanFollowerPolicy to publish this plan to the scheduler.
  */
-public interface PlanView extends PlanContext {
+interface PlanView extends PlanContext {
 
   /**
    * Return a set of {@link ReservationAllocation} identified by the user who
    * made the reservation.
    *
    * @param reservationID the unqiue id to identify the
-   * {@link ReservationAllocation}
+   *          {@link ReservationAllocation}
    * @param interval the time interval used to retrieve the reservation
-   *                 allocations from. Only reservations with start time no
-   *                 greater than the interval end time, and end time no less
-   *                 than the interval start time will be selected.
+   *          allocations from. Only reservations with start time no greater
+   *          than the interval end time, and end time no less than the interval
+   *          start time will be selected.
    * @param user the user to retrieve the reservation allocation from.
    * @return a set of {@link ReservationAllocation} identified by the user who
-   * made the reservation
+   *         made the reservation
    */
-  Set<ReservationAllocation> getReservations(ReservationId
-                    reservationID, ReservationInterval interval, String user);
+  Set<ReservationAllocation> getReservations(ReservationId reservationID,
+      ReservationInterval interval, String user);
 
   /**
    * Return a set of {@link ReservationAllocation} identified by any user.
    *
    * @param reservationID the unqiue id to identify the
-   * {@link ReservationAllocation}
+   *          {@link ReservationAllocation}
    * @param interval the time interval used to retrieve the reservation
-   *                 allocations from. Only reservations with start time no
-   *                 greater than the interval end time, and end time no less
-   *                 than the interval start time will be selected.
+   *          allocations from. Only reservations with start time no greater
+   *          than the interval end time, and end time no less than the interval
+   *          start time will be selected.
    * @return a set of {@link ReservationAllocation} identified by any user
    */
   Set<ReservationAllocation> getReservations(ReservationId reservationID,
-                    ReservationInterval interval);
+      ReservationInterval interval);
 
   /**
    * Return a {@link ReservationAllocation} identified by its
@@ -70,7 +70,7 @@ public interface PlanView extends PlanContext {
    *          {@link ReservationAllocation}
    * @return {@link ReservationAllocation} identified by the specified id
    */
-  public ReservationAllocation getReservationById(ReservationId reservationID);
+  ReservationAllocation getReservationById(ReservationId reservationID);
 
   /**
    * Return a set of {@link ReservationAllocation} that belongs to a certain
@@ -78,11 +78,10 @@ public interface PlanView extends PlanContext {
    *
    * @param user the user being considered
    * @param t the instant in time being considered
-   * @return set of active {@link ReservationAllocation}s for this
-   *         user at this time
+   * @return set of active {@link ReservationAllocation}s for this user at this
+   *         time
    */
-  public Set<ReservationAllocation> getReservationByUserAtTime(String user,
-      long t);
+  Set<ReservationAllocation> getReservationByUserAtTime(String user, long t);
 
   /**
    * Gets all the active reservations at the specified point of time
@@ -91,14 +90,14 @@ public interface PlanView extends PlanContext {
    *          requested
    * @return set of active reservations at the specified time
    */
-  public Set<ReservationAllocation> getReservationsAtTime(long tick);
+  Set<ReservationAllocation> getReservationsAtTime(long tick);
 
   /**
    * Gets all the reservations in the plan
    * 
    * @return set of all reservations handled by this Plan
    */
-  public Set<ReservationAllocation> getAllReservations();
+  Set<ReservationAllocation> getAllReservations();
 
   /**
    * Returns the total {@link Resource} reserved for all users at the specified
@@ -126,61 +125,68 @@ public interface PlanView extends PlanContext {
    * 
    * @return the time (UTC in ms) at which the first reservation starts
    */
-  public long getEarliestStartTime();
+  long getEarliestStartTime();
 
   /**
    * Returns the time (UTC in ms) at which the last reservation terminates
    *
    * @return the time (UTC in ms) at which the last reservation terminates
    */
-  public long getLastEndTime();
+  long getLastEndTime();
 
   /**
    * This method returns the amount of resources available to a given user
    * (optionally if removing a certain reservation) over the start-end time
-   * range.
+   * range. If the request is periodic (period is non-zero) we return the
+   * minimum amount of resources available to periodic reservations (in all
+   * "period" windows within the system maxPeriod / LCM).
    *
-   * @param user
-   * @param oldId
-   * @param start
-   * @param end
+   * @param user the user being considered
+   * @param oldId the identifier of the existing reservation
+   * @param start start of the time interval.
+   * @param end end of the time interval.
+   * @param period the ms periodicty for this request (loop and pick min till
+   *          maxPeriodicity)
    * @return a view of the plan as it is available to this user
-   * @throws PlanningException
+   * @throws PlanningException if operation is unsuccessful
    */
-  public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
-      ReservationId oldId, long start, long end) throws PlanningException;
+  RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+      ReservationId oldId, long start, long end, long period)
+      throws PlanningException;
 
   /**
    * This method returns a RLE encoded view of the user reservation count
    * utilization between start and end time.
    *
-   * @param user
-   * @param start
-   * @param end
+   * @param user the user being considered
+   * @param start start of the time interval.
+   * @param end end of the time interval.
    * @return RLE encoded view of reservation used over time
    */
-  public RLESparseResourceAllocation getReservationCountForUserOverTime(
-      String user, long start, long end);
+  RLESparseResourceAllocation getReservationCountForUserOverTime(String user,
+      long start, long end);
 
   /**
    * This method returns a RLE encoded view of the user reservation utilization
    * between start and end time.
    *
-   * @param user
-   * @param start
-   * @param end
+   * @param user the user being considered
+   * @param start start of the time interval.
+   * @param end end of the time interval.
    * @return RLE encoded view of resources used over time
    */
-  public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+  RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
       long start, long end);
 
   /**
    * Get the cumulative load over a time interval.
    *
-   * @param start Start of the time interval.
-   * @param end End of the time interval.
+   * @param start start of the time interval.
+   * @param end end of the time interval.
    * @return RLE sparse allocation.
+   * @throws PlanningException if operation is unsuccessful
    */
-  RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
+  RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end)
+      throws PlanningException;
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem.

Posted by su...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 658387b..100d38c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -18,8 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.io.IOException;
-import java.io.StringWriter;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -33,8 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.gson.stream.JsonWriter;
-
 /**
  * This is a run length encoded sparse data structure that maintains resource
  * allocations over time.
@@ -44,12 +41,14 @@ public class RLESparseResourceAllocation {
   private static final int THRESHOLD = 100;
   private static final Resource ZERO_RESOURCE = Resources.none();
 
-  private NavigableMap<Long, Resource> cumulativeCapacity =
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected NavigableMap<Long, Resource> cumulativeCapacity =
       new TreeMap<Long, Resource>();
 
   private final ReentrantReadWriteLock readWriteLock =
       new ReentrantReadWriteLock();
-  private final Lock readLock = readWriteLock.readLock();
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected final Lock readLock = readWriteLock.readLock();
   private final Lock writeLock = readWriteLock.writeLock();
 
   private final ResourceCalculator resourceCalculator;
@@ -236,34 +235,6 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Returns the JSON string representation of the current resources allocated
-   * over time.
-   *
-   * @return the JSON string representation of the current resources allocated
-   *         over time
-   */
-  public String toMemJSONString() {
-    StringWriter json = new StringWriter();
-    JsonWriter jsonWriter = new JsonWriter(json);
-    readLock.lock();
-    try {
-      jsonWriter.beginObject();
-      // jsonWriter.name("timestamp").value("resource");
-      for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
-        jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
-      }
-      jsonWriter.endObject();
-      jsonWriter.close();
-      return json.toString();
-    } catch (IOException e) {
-      // This should not happen
-      return "";
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  /**
    * Returns the representation of the current resources allocated over time as
    * an interval map (in the defined non-null range).
    *
@@ -304,7 +275,7 @@ public class RLESparseResourceAllocation {
   public NavigableMap<Long, Resource> getCumulative() {
     readLock.lock();
     try {
-      return cumulativeCapacity;
+      return Collections.unmodifiableNavigableMap(cumulativeCapacity);
     } finally {
       readLock.unlock();
     }
@@ -437,8 +408,8 @@ public class RLESparseResourceAllocation {
       Resource val = Resources.negate(e.getValue());
       // test for negative value and throws
       if (operator == RLEOperator.subtractTestNonNegative
-          && (Resources.fitsIn(val, ZERO_RESOURCE) &&
-              !Resources.equals(val, ZERO_RESOURCE))) {
+          && (Resources.fitsIn(val, ZERO_RESOURCE)
+              && !Resources.equals(val, ZERO_RESOURCE))) {
         throw new PlanningException(
             "RLESparseResourceAllocation: merge failed as the "
                 + "resulting RLESparseResourceAllocation would be negative");
@@ -504,22 +475,29 @@ public class RLESparseResourceAllocation {
 
   }
 
+  /**
+   * Get a {@link RLESparseResourceAllocation} view of the {@link Resource}
+   * allocations between the specified start and end times.
+   *
+   * @param start the time from which the {@link Resource} allocations are
+   *          required
+   * @param end the time upto which the {@link Resource} allocations are
+   *          required
+   * @return the overlapping allocations
+   */
   public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
     readLock.lock();
     try {
       NavigableMap<Long, Resource> a = this.getCumulative();
-
       if (a != null && !a.isEmpty()) {
         // include the portion of previous entry that overlaps start
         if (start > a.firstKey()) {
           long previous = a.floorKey(start);
           a = a.tailMap(previous, true);
         }
-
         if (end < a.lastKey()) {
           a = a.headMap(end, true);
         }
-
       }
       RLESparseResourceAllocation ret =
           new RLESparseResourceAllocation(a, resourceCalculator);
@@ -527,7 +505,33 @@ public class RLESparseResourceAllocation {
     } finally {
       readLock.unlock();
     }
+  }
 
+  /**
+   * This method shifts all the timestamp of the {@link Resource} entries by the
+   * specified "delta".
+   *
+   * @param delta the time by which to shift the {@link Resource} allocations
+   */
+  public void shift(long delta) {
+    writeLock.lock();
+    try {
+      TreeMap<Long, Resource> newCum = new TreeMap<>();
+      long start;
+      for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+        if (delta > 0) {
+          start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE
+              : entry.getKey() + delta;
+        } else {
+          start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE
+              : entry.getKey() + delta;
+        }
+        newCum.put(start, entry.getValue());
+      }
+      cumulativeCapacity = newCum;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   /**
@@ -541,8 +545,8 @@ public class RLESparseResourceAllocation {
   /**
    * Get the maximum capacity across specified time instances. The search-space
    * is specified using the starting value, tick, and the periodic interval for
-   * search. Maximum resource allocation across tick, tick + period,
-   * tick + 2 * period,..., tick + n * period .. is returned.
+   * search. Maximum resource allocation across tick, tick + period, tick + 2 *
+   * period,..., tick + n * period .. is returned.
    *
    * @param tick the starting time instance
    * @param period interval at which capacity is evaluated
@@ -550,14 +554,19 @@ public class RLESparseResourceAllocation {
    */
   public Resource getMaximumPeriodicCapacity(long tick, long period) {
     Resource maxCapacity = ZERO_RESOURCE;
-    if (!cumulativeCapacity.isEmpty()) {
-      Long lastKey = cumulativeCapacity.lastKey();
-      for (long t = tick; t <= lastKey; t = t + period) {
-        maxCapacity = Resources.componentwiseMax(maxCapacity,
-            cumulativeCapacity.floorEntry(t).getValue());
+    readLock.lock();
+    try {
+      if (!cumulativeCapacity.isEmpty()) {
+        Long lastKey = cumulativeCapacity.lastKey();
+        for (long t = tick; t <= lastKey; t = t + period) {
+          maxCapacity = Resources.componentwiseMax(maxCapacity,
+              cumulativeCapacity.floorEntry(t).getValue());
+        }
       }
+      return maxCapacity;
+    } finally {
+      readLock.unlock();
     }
-    return maxCapacity;
   }
 
   /**
@@ -567,17 +576,17 @@ public class RLESparseResourceAllocation {
    * @return minimum resource allocation
    */
   public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
-    Resource minCapacity = Resource.newInstance(
-        Integer.MAX_VALUE, Integer.MAX_VALUE);
+    Resource minCapacity =
+        Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE);
     long start = interval.getStartTime();
     long end = interval.getEndTime();
     NavigableMap<Long, Resource> capacityRange =
-        this.getRangeOverlapping(start, end).getCumulative();
+        getRangeOverlapping(start, end).getCumulative();
     if (!capacityRange.isEmpty()) {
       for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
         if (entry.getValue() != null) {
-          minCapacity = Resources.componentwiseMin(minCapacity,
-              entry.getValue());
+          minCapacity =
+              Resources.componentwiseMin(minCapacity, entry.getValue());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 0da95ac..bb4a7fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -24,14 +24,16 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A ReservationAllocation represents a concrete allocation of resources over
  * time that satisfy a certain {@link ReservationDefinition}. This is used
  * internally by a {@link Plan} to store information about how each of the
  * accepted {@link ReservationDefinition} have been allocated.
  */
-public interface ReservationAllocation extends
-    Comparable<ReservationAllocation> {
+public interface ReservationAllocation
+    extends Comparable<ReservationAllocation> {
 
   /**
    * Returns the unique identifier {@link ReservationId} that represents the
@@ -40,28 +42,28 @@ public interface ReservationAllocation extends
    * @return reservationId the unique identifier {@link ReservationId} that
    *         represents the reservation
    */
-  public ReservationId getReservationId();
+  ReservationId getReservationId();
 
   /**
    * Returns the original {@link ReservationDefinition} submitted by the client
    * 
    * @return the {@link ReservationDefinition} submitted by the client
    */
-  public ReservationDefinition getReservationDefinition();
+  ReservationDefinition getReservationDefinition();
 
   /**
    * Returns the time at which the reservation is activated.
    * 
    * @return the time at which the reservation is activated
    */
-  public long getStartTime();
+  long getStartTime();
 
   /**
    * Returns the time at which the reservation terminates.
    * 
    * @return the time at which the reservation terminates
    */
-  public long getEndTime();
+  long getEndTime();
 
   /**
    * Returns the map of resources requested against the time interval for which
@@ -70,28 +72,28 @@ public interface ReservationAllocation extends
    * @return the allocationRequests the map of resources requested against the
    *         time interval for which they were
    */
-  public Map<ReservationInterval, Resource> getAllocationRequests();
+  Map<ReservationInterval, Resource> getAllocationRequests();
 
   /**
    * Return a string identifying the plan to which the reservation belongs
    * 
    * @return the plan to which the reservation belongs
    */
-  public String getPlanName();
+  String getPlanName();
 
   /**
    * Returns the user who requested the reservation
    * 
    * @return the user who requested the reservation
    */
-  public String getUser();
+  String getUser();
 
   /**
    * Returns whether the reservation has gang semantics or not
    * 
    * @return true if there is a gang request, false otherwise
    */
-  public boolean containsGangs();
+  boolean containsGangs();
 
   /**
    * Sets the time at which the reservation was accepted by the system
@@ -99,14 +101,14 @@ public interface ReservationAllocation extends
    * @param acceptedAt the time at which the reservation was accepted by the
    *          system
    */
-  public void setAcceptanceTimestamp(long acceptedAt);
+  void setAcceptanceTimestamp(long acceptedAt);
 
   /**
    * Returns the time at which the reservation was accepted by the system
    * 
    * @return the time at which the reservation was accepted by the system
    */
-  public long getAcceptanceTime();
+  long getAcceptanceTime();
 
   /**
    * Returns the capacity represented by cumulative resources reserved by the
@@ -116,12 +118,42 @@ public interface ReservationAllocation extends
    *          requested
    * @return the resources reserved at the specified time
    */
-  public Resource getResourcesAtTime(long tick);
+  Resource getResourcesAtTime(long tick);
+
+  /**
+   * Return a RLE representation of used resources.
+   *
+   * @return a RLE encoding of resources allocated over time.
+   */
+  RLESparseResourceAllocation getResourcesOverTime();
+
 
   /**
    * Return a RLE representation of used resources.
+   *
+   * @param start start of the time interval.
+   * @param end end of the time interval.
    * @return a RLE encoding of resources allocated over time.
    */
-  public RLESparseResourceAllocation getResourcesOverTime();
+  RLESparseResourceAllocation getResourcesOverTime(long start, long end);
+
+  /**
+   * Get the periodicity of this reservation representing the time period of the
+   * periodic job. Period is represented in milliseconds for periodic jobs.
+   * Period is 0 for non-periodic jobs.
+   *
+   * @return periodicity of this reservation
+   */
+  long getPeriodicity();
+
+  /**
+   * Set the periodicity of this reservation representing the time period of the
+   * periodic job. Period is represented in milliseconds for periodic jobs.
+   * Period is 0 for non-periodic jobs.
+   *
+   * @param period periodicity of this reservation
+   */
+  @VisibleForTesting
+  void setPeriodicity(long period);
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
index 027d066..a66d222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
@@ -44,6 +44,8 @@ public class ReservationInputValidator {
 
   /**
    * Utility class to validate reservation requests.
+   *
+   * @param clock the {@link Clock} to use
    */
   public ReservationInputValidator(Clock clock) {
     this.clock = clock;
@@ -53,22 +55,21 @@ public class ReservationInputValidator {
       ReservationId reservationId, String auditConstant) throws YarnException {
     // check if the reservation id is valid
     if (reservationId == null) {
-      String message =
-          "Missing reservation id."
-              + " Please try again by specifying a reservation id.";
+      String message = "Missing reservation id."
+          + " Please try again by specifying a reservation id.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
     String queue = reservationSystem.getQueueForReservation(reservationId);
     String nullQueueErrorMessage =
-            "The specified reservation with ID: " + reservationId
-                    + " is unknown. Please try again with a valid reservation.";
+        "The specified reservation with ID: " + reservationId
+            + " is unknown. Please try again with a valid reservation.";
     String nullPlanErrorMessage = "The specified reservation: " + reservationId
-                            + " is not associated with any valid plan."
-                            + " Please try again with a valid reservation.";
+        + " is not associated with any valid plan."
+        + " Please try again with a valid reservation.";
     return getPlanFromQueue(reservationSystem, queue, auditConstant,
-            nullQueueErrorMessage, nullPlanErrorMessage);
+        nullQueueErrorMessage, nullPlanErrorMessage);
   }
 
   private void validateReservationDefinition(ReservationId reservationId,
@@ -77,17 +78,15 @@ public class ReservationInputValidator {
     String message = "";
     // check if deadline is in the past
     if (contract == null) {
-      message =
-          "Missing reservation definition."
-              + " Please try again by specifying a reservation definition.";
+      message = "Missing reservation definition."
+          + " Please try again by specifying a reservation definition.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
     if (contract.getDeadline() <= clock.getTime()) {
-      message =
-          "The specified deadline: " + contract.getDeadline()
-              + " is the past. Please try again with deadline in the future.";
+      message = "The specified deadline: " + contract.getDeadline()
+          + " is the past. Please try again with deadline in the future.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -95,18 +94,16 @@ public class ReservationInputValidator {
     // Check if at least one RR has been specified
     ReservationRequests resReqs = contract.getReservationRequests();
     if (resReqs == null) {
-      message =
-          "No resources have been specified to reserve."
-              + "Please try again by specifying the resources to reserve.";
+      message = "No resources have been specified to reserve."
+          + "Please try again by specifying the resources to reserve.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
     List<ReservationRequest> resReq = resReqs.getReservationResources();
     if (resReq == null || resReq.isEmpty()) {
-      message =
-          "No resources have been specified to reserve."
-              + " Please try again by specifying the resources to reserve.";
+      message = "No resources have been specified to reserve."
+          + " Please try again by specifying the resources to reserve.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -123,22 +120,18 @@ public class ReservationInputValidator {
       } else {
         minDuration += rr.getDuration();
       }
-      maxGangSize =
-          Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              maxGangSize,
-              Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+      maxGangSize = Resources.max(plan.getResourceCalculator(),
+          plan.getTotalCapacity(), maxGangSize,
+          Resources.multiply(rr.getCapability(), rr.getConcurrency()));
     }
     // verify the allocation is possible (skip for ANY)
     long duration = contract.getDeadline() - contract.getArrival();
-    if (duration < minDuration
-        && type != ReservationRequestInterpreter.R_ANY) {
-      message =
-          "The time difference ("
-              + (duration)
-              + ") between arrival (" + contract.getArrival() + ") "
-              + "and deadline (" + contract.getDeadline() + ") must "
-              + " be greater or equal to the minimum resource duration ("
-              + minDuration + ")";
+    if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) {
+      message = "The time difference (" + (duration) + ") between arrival ("
+          + contract.getArrival() + ") " + "and deadline ("
+          + contract.getDeadline() + ") must "
+          + " be greater or equal to the minimum resource duration ("
+          + minDuration + ")";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -148,10 +141,9 @@ public class ReservationInputValidator {
     if (Resources.greaterThan(plan.getResourceCalculator(),
         plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
         && type != ReservationRequestInterpreter.R_ANY) {
-      message =
-          "The size of the largest gang in the reservation definition ("
-              + maxGangSize + ") exceed the capacity available ("
-              + plan.getTotalCapacity() + " )";
+      message = "The size of the largest gang in the reservation definition ("
+          + maxGangSize + ") exceed the capacity available ("
+          + plan.getTotalCapacity() + " )";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -179,32 +171,32 @@ public class ReservationInputValidator {
     }
   }
 
-  private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
-          queue, String auditConstant) throws YarnException {
+  private Plan getPlanFromQueue(ReservationSystem reservationSystem,
+      String queue, String auditConstant) throws YarnException {
     String nullQueueErrorMessage = "The queue is not specified."
-            + " Please try again with a valid reservable queue.";
+        + " Please try again with a valid reservable queue.";
     String nullPlanErrorMessage = "The specified queue: " + queue
-            + " is not managed by reservation system."
-            + " Please try again with a valid reservable queue.";
+        + " is not managed by reservation system."
+        + " Please try again with a valid reservable queue.";
     return getPlanFromQueue(reservationSystem, queue, auditConstant,
-            nullQueueErrorMessage, nullPlanErrorMessage);
+        nullQueueErrorMessage, nullPlanErrorMessage);
   }
 
-  private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
-          queue, String auditConstant, String nullQueueErrorMessage,
-          String nullPlanErrorMessage) throws YarnException {
+  private Plan getPlanFromQueue(ReservationSystem reservationSystem,
+      String queue, String auditConstant, String nullQueueErrorMessage,
+      String nullPlanErrorMessage) throws YarnException {
     if (queue == null || queue.isEmpty()) {
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
-              "validate reservation input", "ClientRMService",
-              nullQueueErrorMessage);
+          "validate reservation input", "ClientRMService",
+          nullQueueErrorMessage);
       throw RPCUtil.getRemoteException(nullQueueErrorMessage);
     }
     // check if the associated plan is valid
     Plan plan = reservationSystem.getPlan(queue);
     if (plan == null) {
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
-              "validate reservation input", "ClientRMService",
-              nullPlanErrorMessage);
+          "validate reservation input", "ClientRMService",
+          nullPlanErrorMessage);
       throw RPCUtil.getRemoteException(nullPlanErrorMessage);
     }
     return plan;
@@ -222,22 +214,21 @@ public class ReservationInputValidator {
    * @param reservationId the {@link ReservationId} associated with the current
    *          request
    * @return the {@link Plan} to submit the request to
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationSubmissionRequest(
-      ReservationSystem reservationSystem,
-      ReservationSubmissionRequest request, ReservationId reservationId)
-      throws YarnException {
+      ReservationSystem reservationSystem, ReservationSubmissionRequest request,
+      ReservationId reservationId) throws YarnException {
     String message;
     if (reservationId == null) {
-      message = "Reservation id cannot be null. Please try again " +
-        "specifying a valid reservation id by creating a new reservation id.";
+      message = "Reservation id cannot be null. Please try again specifying "
+          + " a valid reservation id by creating a new reservation id.";
       throw RPCUtil.getRemoteException(message);
     }
     // Check if it is a managed queue
     String queue = request.getQueue();
     Plan plan = getPlanFromQueue(reservationSystem, queue,
-            AuditConstants.SUBMIT_RESERVATION_REQUEST);
+        AuditConstants.SUBMIT_RESERVATION_REQUEST);
 
     validateReservationDefinition(reservationId,
         request.getReservationDefinition(), plan,
@@ -255,15 +246,14 @@ public class ReservationInputValidator {
    * @param request the {@link ReservationUpdateRequest} defining the resources
    *          required over time for the request
    * @return the {@link Plan} to submit the request to
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationUpdateRequest(
       ReservationSystem reservationSystem, ReservationUpdateRequest request)
       throws YarnException {
     ReservationId reservationId = request.getReservationId();
-    Plan plan =
-        validateReservation(reservationSystem, reservationId,
-            AuditConstants.UPDATE_RESERVATION_REQUEST);
+    Plan plan = validateReservation(reservationSystem, reservationId,
+        AuditConstants.UPDATE_RESERVATION_REQUEST);
     validateReservationDefinition(reservationId,
         request.getReservationDefinition(), plan,
         AuditConstants.UPDATE_RESERVATION_REQUEST);
@@ -278,28 +268,26 @@ public class ReservationInputValidator {
    *
    * @param reservationSystem the {@link ReservationSystem} to validate against
    * @param request the {@link ReservationListRequest} defining search
-   *                parameters for reservations in the {@link ReservationSystem}
-   *                that is being validated against.
+   *          parameters for reservations in the {@link ReservationSystem} that
+   *          is being validated against.
    * @return the {@link Plan} to list reservations of.
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationListRequest(
-      ReservationSystem reservationSystem,
-      ReservationListRequest request)
+      ReservationSystem reservationSystem, ReservationListRequest request)
       throws YarnException {
     String queue = request.getQueue();
     if (request.getEndTime() < request.getStartTime()) {
-      String errorMessage = "The specified end time must be greater than " +
-              "the specified start time.";
+      String errorMessage = "The specified end time must be greater than "
+          + "the specified start time.";
       RMAuditLogger.logFailure("UNKNOWN",
-              AuditConstants.LIST_RESERVATION_REQUEST,
-              "validate list reservation input", "ClientRMService",
-              errorMessage);
+          AuditConstants.LIST_RESERVATION_REQUEST,
+          "validate list reservation input", "ClientRMService", errorMessage);
       throw RPCUtil.getRemoteException(errorMessage);
     }
     // Check if it is a managed queue
     return getPlanFromQueue(reservationSystem, queue,
-            AuditConstants.LIST_RESERVATION_REQUEST);
+        AuditConstants.LIST_RESERVATION_REQUEST);
   }
 
   /**
@@ -312,7 +300,7 @@ public class ReservationInputValidator {
    * @param request the {@link ReservationDeleteRequest} defining the resources
    *          required over time for the request
    * @return the {@link Plan} to submit the request to
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationDeleteRequest(
       ReservationSystem reservationSystem, ReservationDeleteRequest request)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index 8b62972..a6c8fcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -29,8 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
 
-import java.util.Map;
-
 /**
  * This interface is the one implemented by any system that wants to support
  * Reservations i.e. make {@code Resource} allocations in future. Implementors
@@ -57,7 +57,7 @@ public interface ReservationSystem extends Recoverable {
    * 
    * @param conf configuration
    * @param rmContext current context of the {@code ResourceManager}
-   * @throws YarnException
+   * @throws YarnException if initialization of the configured plan fails
    */
   void reinitialize(Configuration conf, RMContext rmContext)
       throws YarnException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
index e458055..cbf0f38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -38,7 +38,7 @@ public interface SharingPolicy {
    * @param planQueuePath the name of the queue for this plan
    * @param conf the system configuration
    */
-  public void init(String planQueuePath, ReservationSchedulerConfiguration conf);
+  void init(String planQueuePath, ReservationSchedulerConfiguration conf);
 
   /**
    * This method runs the policy validation logic, and return true/false on
@@ -51,7 +51,7 @@ public interface SharingPolicy {
    * @throws PlanningException if the policy is respected if we add this
    *           {@link ReservationAllocation} to the {@link Plan}
    */
-  public void validate(Plan plan, ReservationAllocation newAllocation)
+  void validate(Plan plan, ReservationAllocation newAllocation)
       throws PlanningException;
 
   /**
@@ -68,9 +68,13 @@ public interface SharingPolicy {
    * @param start the start time for the range we are querying
    * @param end the end time for the range we are querying
    * @param oldId (optional) the id of a reservation being updated
+   *
+   * @return the available resources expressed as a
+   *         {@link RLESparseResourceAllocation}
+   *
    * @throws PlanningException throws if the request is not valid
    */
-  public RLESparseResourceAllocation availableResources(
+  RLESparseResourceAllocation availableResources(
       RLESparseResourceAllocation available, Plan plan, String user,
       ReservationId oldId, long start, long end) throws PlanningException;
 
@@ -82,7 +86,6 @@ public interface SharingPolicy {
    * 
    * @return validWindow the window of validity considered by the policy.
    */
-  public long getValidWindow();
-
+  long getValidWindow();
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
index abac6ac..af0e712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -34,7 +34,7 @@ public interface Planner {
    *
    * @param plan the {@link Plan} to replan
    * @param contracts the list of reservation requests
-   * @throws PlanningException
+   * @throws PlanningException if operation is unsuccessful
    */
   public void plan(Plan plan, List<ReservationDefinition> contracts)
       throws PlanningException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index 199bfa5..bbbf0d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -50,7 +50,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
    * @return whether the allocateUser function was successful or not
    *
    * @throws PlanningException if the session cannot be fitted into the plan
-   * @throws ContractValidationException
+   * @throws ContractValidationException if validation fails
    */
   protected boolean allocateUser(ReservationId reservationId, String user,
       Plan plan, ReservationDefinition contract,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index ec6d9c0..8934b0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -50,7 +50,7 @@ public interface StageAllocator {
    *
    * @return The computed allocation (or null if the stage could not be
    *         allocated)
-   * @throws PlanningException
+   * @throws PlanningException if operation is unsuccessful
    */
   Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       RLESparseResourceAllocation planLoads,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index da04336..d107487 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
 
     RLESparseResourceAllocation netAvailable =
         plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
-            stageDeadline);
+            stageDeadline, 0);
 
     netAvailable =
         RLESparseResourceAllocation.merge(plan.getResourceCalculator(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index ec83e02..ae7d91a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -83,9 +83,8 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
     int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
 
     // get available resources from plan
-    RLESparseResourceAllocation netRLERes =
-        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
-            stageDeadline);
+    RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
+        user, oldId, stageEarliestStart, stageDeadline, 0);
 
     // remove plan modifications
     netRLERes =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index e45f58c..c014549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -77,8 +77,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
     ResourceCalculator resCalc = plan.getResourceCalculator();
     Resource capacity = plan.getTotalCapacity();
 
-    RLESparseResourceAllocation netRLERes = plan
-        .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
+    RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
+        user, oldId, stageArrival, stageDeadline, 0);
 
     long step = plan.getStep();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index e99842e..5337e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anySetOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.FileWriter;
 import java.io.IOException;
@@ -76,7 +79,8 @@ public class ReservationSystemTestUtil {
       String reservationQ, long timeWindow, float instConstraint,
       float avgConstraint) {
 
-    ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
+    ReservationSchedulerConfiguration realConf =
+        new CapacitySchedulerConfiguration();
     ReservationSchedulerConfiguration conf = spy(realConf);
     when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
     when(conf.getInstantaneousMaxCapacity(reservationQ))
@@ -168,7 +172,6 @@ public class ReservationSystemTestUtil {
     scheduler.start();
     scheduler.reinitialize(conf, rmContext);
 
-
     Resource resource =
         ReservationSystemTestUtil.calculateClusterResource(numContainers);
     RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
@@ -184,10 +187,16 @@ public class ReservationSystemTestUtil {
 
   public static ReservationDefinition createSimpleReservationDefinition(
       long arrival, long deadline, long duration, int parallelism) {
+    return createSimpleReservationDefinition(arrival, deadline, duration,
+        parallelism, null);
+  }
+
+  public static ReservationDefinition createSimpleReservationDefinition(
+      long arrival, long deadline, long duration, int parallelism,
+      String recurrenceExpression) {
     // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-            parallelism, parallelism, duration);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), parallelism, parallelism, duration);
     ReservationDefinition rDef = new ReservationDefinitionPBImpl();
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setReservationResources(Collections.singletonList(r));
@@ -195,32 +204,31 @@ public class ReservationSystemTestUtil {
     rDef.setReservationRequests(reqs);
     rDef.setArrival(arrival);
     rDef.setDeadline(deadline);
+    if (recurrenceExpression != null) {
+      rDef.setRecurrenceExpression(recurrenceExpression);
+    }
     return rDef;
   }
 
   public static ReservationSubmissionRequest createSimpleReservationRequest(
       ReservationId reservationId, int numContainers, long arrival,
       long deadline, long duration) {
-    return createSimpleReservationRequest(reservationId, numContainers,
-        arrival, deadline, duration, Priority.UNDEFINED);
+    return createSimpleReservationRequest(reservationId, numContainers, arrival,
+        deadline, duration, Priority.UNDEFINED);
   }
 
   public static ReservationSubmissionRequest createSimpleReservationRequest(
       ReservationId reservationId, int numContainers, long arrival,
       long deadline, long duration, Priority priority) {
     // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-            numContainers, 1, duration);
-    ReservationRequests reqs =
-        ReservationRequests.newInstance(Collections.singletonList(r),
-            ReservationRequestInterpreter.R_ALL);
-    ReservationDefinition rDef =
-        ReservationDefinition.newInstance(arrival, deadline, reqs,
-            "testClientRMService#reservation", "0", priority);
-    ReservationSubmissionRequest request =
-        ReservationSubmissionRequest.newInstance(rDef,
-            reservationQ, reservationId);
+    ReservationRequest r = ReservationRequest
+        .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
+    ReservationRequests reqs = ReservationRequests.newInstance(
+        Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+        deadline, reqs, "testClientRMService#reservation", "0", priority);
+    ReservationSubmissionRequest request = ReservationSubmissionRequest
+        .newInstance(rDef, reservationQ, reservationId);
     return request;
   }
 
@@ -252,9 +260,9 @@ public class ReservationSystemTestUtil {
     return cs;
   }
 
-  @SuppressWarnings("rawtypes") public static void initializeRMContext(
-      int numContainers, AbstractYarnScheduler scheduler,
-      RMContext mockRMContext) {
+  @SuppressWarnings("rawtypes")
+  public static void initializeRMContext(int numContainers,
+      AbstractYarnScheduler scheduler, RMContext mockRMContext) {
 
     when(mockRMContext.getScheduler()).thenReturn(scheduler);
     Resource r = calculateClusterResource(numContainers);
@@ -262,26 +270,25 @@ public class ReservationSystemTestUtil {
   }
 
   public static RMContext createRMContext(Configuration conf) {
-    RMContext mockRmContext = Mockito.spy(
-        new RMContextImpl(null, null, null, null, null, null,
-            new RMContainerTokenSecretManager(conf),
-            new NMTokenSecretManagerInRM(conf),
-            new ClientToAMTokenSecretManagerInRM(), null));
+    RMContext mockRmContext = Mockito.spy(new RMContextImpl(null, null, null,
+        null, null, null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null));
 
     RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
     when(nlm.getQueueResource(any(String.class), anySetOf(String.class),
-            any(Resource.class))).thenAnswer(new Answer<Resource>() {
-      @Override public Resource answer(InvocationOnMock invocation)
-          throws Throwable {
-        Object[] args = invocation.getArguments();
-        return (Resource) args[2];
-      }
-    });
+        any(Resource.class))).thenAnswer(new Answer<Resource>() {
+          @Override
+          public Resource answer(InvocationOnMock invocation) throws Throwable {
+            Object[] args = invocation.getArguments();
+            return (Resource) args[2];
+          }
+        });
 
     when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
         .thenAnswer(new Answer<Resource>() {
-          @Override public Resource answer(InvocationOnMock invocation)
-              throws Throwable {
+          @Override
+          public Resource answer(InvocationOnMock invocation) throws Throwable {
             Object[] args = invocation.getArguments();
             return (Resource) args[1];
           }
@@ -304,9 +311,8 @@ public class ReservationSystemTestUtil {
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
 
-    final String dedicated =
-        CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT
-            + reservationQ;
+    final String dedicated = CapacitySchedulerConfiguration.ROOT
+        + CapacitySchedulerConfiguration.DOT + reservationQ;
     conf.setCapacity(dedicated, 80);
     // Set as reservation queue
     conf.setReservable(dedicated, true);
@@ -405,26 +411,55 @@ public class ReservationSystemTestUtil {
 
   public static Map<ReservationInterval, Resource> generateAllocation(
       long startTime, long step, int[] alloc) {
+    return generateAllocation(startTime, step, alloc, null);
+  }
+
+  public static Map<ReservationInterval, Resource> generateAllocation(
+      long startTime, long step, int[] alloc, String recurrenceExpression) {
     Map<ReservationInterval, Resource> req = new TreeMap<>();
-    for (int i = 0; i < alloc.length; i++) {
-      req.put(new ReservationInterval(startTime + i * step,
-          startTime + (i + 1) * step), ReservationSystemUtil.toResource(
-          ReservationRequest
-              .newInstance(Resource.newInstance(1024, 1), alloc[i])));
+
+    long period = 0;
+    if (recurrenceExpression != null) {
+      period = Long.parseLong(recurrenceExpression);
+    }
+
+    long rStart;
+    long rEnd;
+    for (int j = 0; j < 86400000; j += period) {
+      for (int i = 0; i < alloc.length; i++) {
+        rStart = (startTime + i * step) + j * period;
+        rEnd = (startTime + (i + 1) * step) + j * period;
+        if (period > 0) {
+          rStart = rStart % period + j * period;
+          rEnd = rEnd % period + j * period;
+          if (rStart > rEnd) {
+            // skip wrap-around entry
+            continue;
+          }
+        }
+
+        req.put(new ReservationInterval(rStart, rEnd),
+            ReservationSystemUtil.toResource(ReservationRequest
+                .newInstance(Resource.newInstance(1024, 1), alloc[i])));
+
+      }
+      // execute only once if non-periodic
+      if (period == 0) {
+        break;
+      }
     }
     return req;
   }
 
-  public static RLESparseResourceAllocation
-      generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) {
+  public static RLESparseResourceAllocation generateRLESparseResourceAllocation(
+      int[] alloc, long[] timeSteps) {
     TreeMap<Long, Resource> allocationsMap = new TreeMap<>();
     for (int i = 0; i < alloc.length; i++) {
       allocationsMap.put(timeSteps[i],
           Resource.newInstance(alloc[i], alloc[i]));
     }
-    RLESparseResourceAllocation rleVector =
-        new RLESparseResourceAllocation(allocationsMap,
-            new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleVector = new RLESparseResourceAllocation(
+        allocationsMap, new DefaultResourceCalculator());
     return rleVector;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org