You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:22 UTC

[07/33] YARN-1709. In-memory data structures used to track resources over time to enable reservations. (cherry picked from commit 0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3) (cherry picked from commit cf4b34282aafee9f6b09d3433c4de1ae4b359168)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63250ef9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
new file mode 100644
index 0000000..6dcd41f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -0,0 +1,477 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemoryPlan {
+
+  private String user = "yarn";
+  private String planName = "test-reservation";
+  private ResourceCalculator resCalc;
+  private Resource minAlloc;
+  private Resource maxAlloc;
+  private Resource totalCapacity;
+
+  private Clock clock;
+  private QueueMetrics queueMetrics;
+  private SharingPolicy policy;
+  private ReservationAgent agent;
+  private Planner replanner;
+
+  @Before
+  public void setUp() throws PlanningException {
+    resCalc = new DefaultResourceCalculator();
+    minAlloc = Resource.newInstance(1024, 1);
+    maxAlloc = Resource.newInstance(64 * 1024, 20);
+    totalCapacity = Resource.newInstance(100 * 1024, 100);
+
+    clock = mock(Clock.class);
+    queueMetrics = mock(QueueMetrics.class);
+    policy = mock(SharingPolicy.class);
+    replanner = mock(Planner.class);
+
+    when(clock.getTime()).thenReturn(1L);
+  }
+
+  @After
+  public void tearDown() {
+    resCalc = null;
+    minAlloc = null;
+    maxAlloc = null;
+    totalCapacity = null;
+
+    clock = null;
+    queueMetrics = null;
+    policy = null;
+    replanner = null;
+  }
+
+  @Test
+  public void testAddReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  @Test
+  public void testAddEmptyReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = {};
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testAddReservationAlreadyExists() {
+    // First add a reservation
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Try to add it again
+    try {
+      plan.addReservation(rAllocation);
+      Assert.fail("Add should fail as it already exists");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().endsWith("already exists"));
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+  }
+
+  @Test
+  public void testUpdateReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    // First add a reservation
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now update it
+    start = 110;
+    int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
+    allocations = generateAllocation(start, updatedAlloc, true);
+    rDef =
+        createSimpleReservationDefinition(start, start + updatedAlloc.length,
+            updatedAlloc.length, allocations.values());
+    rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
+    try {
+      plan.updateReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < updatedAlloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+              + i), plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+              + i), plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  @Test
+  public void testUpdateNonExistingReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    // Try to update a reservation without adding
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.updateReservation(rAllocation);
+      Assert.fail("Update should fail as it does not exist in the plan");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testDeleteReservation() {
+    // First add a reservation
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, true);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now delete it
+    try {
+      plan.deleteReservation(reservationID);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  @Test
+  public void testDeleteNonExistingReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    // Try to delete a reservation without adding
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.deleteReservation(reservationID);
+      Assert.fail("Delete should fail as it does not exist in the plan");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testArchiveCompletedReservations() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID1 =
+        ReservationSystemTestUtil.getNewReservationId();
+    // First add a reservation
+    int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations1 =
+        generateAllocation(start, alloc1, false);
+    ReservationDefinition rDef1 =
+        createSimpleReservationDefinition(start, start + alloc1.length,
+            alloc1.length, allocations1.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID1, rDef1, user,
+            planName, start, start + alloc1.length, allocations1, resCalc,
+            minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID1));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc1.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now add another one
+    ReservationId reservationID2 =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc2 = { 0, 5, 10, 5, 0 };
+    Map<ReservationInterval, ReservationRequest> allocations2 =
+        generateAllocation(start, alloc2, true);
+    ReservationDefinition rDef2 =
+        createSimpleReservationDefinition(start, start + alloc2.length,
+            alloc2.length, allocations2.values());
+    rAllocation =
+        new InMemoryReservationAllocation(reservationID2, rDef2, user,
+            planName, start, start + alloc2.length, allocations2, resCalc,
+            minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID2));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan.getReservationById(reservationID2));
+    for (int i = 0; i < alloc2.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+              + alloc2[i] + i), plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+              + alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now archive completed reservations
+    when(clock.getTime()).thenReturn(106L);
+    when(policy.getValidWindow()).thenReturn(1L);
+    try {
+      // will only remove 2nd reservation as only that has fallen out of the
+      // archival window
+      plan.archiveCompletedReservations(clock.getTime());
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan.getReservationById(reservationID1));
+    Assert.assertNull(plan.getReservationById(reservationID2));
+    for (int i = 0; i < alloc1.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+    when(clock.getTime()).thenReturn(107L);
+    try {
+      // will remove 1st reservation also as it has fallen out of the archival
+      // window
+      plan.archiveCompletedReservations(clock.getTime());
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID1));
+    for (int i = 0; i < alloc1.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
+    ReservationId reservationID = rAllocation.getReservationId();
+    Assert.assertNotNull(plan.getReservationById(reservationID));
+    Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
+    Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
+    Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
+    Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
+    Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
+    Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
+    Assert.assertEquals(resCalc, plan.getResourceCalculator());
+    Assert.assertEquals(planName, plan.getQueueName());
+    Assert.assertTrue(plan.getMoveOnExpiry());
+  }
+
+  private ReservationDefinition createSimpleReservationDefinition(long arrival,
+      long deadline, long duration, Collection<ReservationRequest> resources) {
+    // create a request with a single atomic ask
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(new ArrayList<ReservationRequest>(resources));
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    rDef.setReservationRequests(reqs);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    return rDef;
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc, boolean isStep) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    int numContainers = 0;
+    for (int i = 0; i < alloc.length; i++) {
+      if (isStep) {
+        numContainers = alloc[i] + i;
+      } else {
+        numContainers = alloc[i];
+      }
+      ReservationRequest rr =
+          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+              (numContainers));
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+    }
+    return req;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63250ef9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
new file mode 100644
index 0000000..f4c4581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
@@ -0,0 +1,206 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemoryReservationAllocation {
+
+  private String user = "yarn";
+  private String planName = "test-reservation";
+  private ResourceCalculator resCalc;
+  private Resource minAlloc;
+
+  private Random rand = new Random();
+
+  @Before
+  public void setUp() {
+    resCalc = new DefaultResourceCalculator();
+    minAlloc = Resource.newInstance(1, 1);
+  }
+
+  @After
+  public void tearDown() {
+    user = null;
+    planName = null;
+    resCalc = null;
+    minAlloc = null;
+  }
+
+  @Test
+  public void testBlocks() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false, false);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testSteps() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, true, false);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testSkyline() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 0, 5, 10, 10, 5, 0 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, true, false);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testZeroAlloaction() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = {};
+    long start = 0;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, (int) start,
+        alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+  }
+
+  @Test
+  public void testGangAlloaction() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false, true);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertTrue(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  private void doAssertions(ReservationAllocation rAllocation,
+      ReservationId reservationID, ReservationDefinition rDef,
+      Map<ReservationInterval, ReservationRequest> allocations, int start,
+      int[] alloc) {
+    Assert.assertEquals(reservationID, rAllocation.getReservationId());
+    Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
+    Assert.assertEquals(allocations, rAllocation.getAllocationRequests());
+    Assert.assertEquals(user, rAllocation.getUser());
+    Assert.assertEquals(planName, rAllocation.getPlanName());
+    Assert.assertEquals(start, rAllocation.getStartTime());
+    Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
+  }
+
+  private ReservationDefinition createSimpleReservationDefinition(long arrival,
+      long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
+            duration);
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    rDef.setReservationRequests(reqs);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    return rDef;
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc, boolean isStep, boolean isGang) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    int numContainers = 0;
+    for (int i = 0; i < alloc.length; i++) {
+      if (isStep) {
+        numContainers = alloc[i] + i;
+      } else {
+        numContainers = alloc[i];
+      }
+      ReservationRequest rr =
+          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+              (numContainers));
+      if (isGang) {
+        rr.setConcurrency(numContainers);
+      }
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+    }
+    return req;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63250ef9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
new file mode 100644
index 0000000..ab0de6b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -0,0 +1,169 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRLESparseResourceAllocation {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestRLESparseResourceAllocation.class);
+
+  @Test
+  public void testBlocks() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+        generateAllocation(start, alloc, false).entrySet();
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    Assert.assertFalse(rleSparseVector.isEmpty());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(99));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  @Test
+  public void testSteps() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+        generateAllocation(start, alloc, true).entrySet();
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    Assert.assertFalse(rleSparseVector.isEmpty());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(99));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  @Test
+  public void testSkyline() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    int[] alloc = { 0, 5, 10, 10, 5, 0 };
+    int start = 100;
+    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+        generateAllocation(start, alloc, true).entrySet();
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    Assert.assertFalse(rleSparseVector.isEmpty());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(99));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  @Test
+  public void testZeroAlloaction() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
+        ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
+    LOG.info(rleSparseVector.toString());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(new Random().nextLong()));
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc, boolean isStep) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    int numContainers = 0;
+    for (int i = 0; i < alloc.length; i++) {
+      if (isStep) {
+        numContainers = alloc[i] + i;
+      } else {
+        numContainers = alloc[i];
+      }
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+
+      ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+          (numContainers)));
+    }
+    return req;
+  }
+
+}