You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:22 UTC
[07/33] YARN-1709. In-memory data structures used to track resources
over time to enable reservations. (cherry picked from commit
0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3) (cherry picked from commit
cf4b34282aafee9f6b09d3433c4de1ae4b359168)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63250ef9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
new file mode 100644
index 0000000..6dcd41f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -0,0 +1,477 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemoryPlan {
+
+ private String user = "yarn";
+ private String planName = "test-reservation";
+ private ResourceCalculator resCalc;
+ private Resource minAlloc;
+ private Resource maxAlloc;
+ private Resource totalCapacity;
+
+ private Clock clock;
+ private QueueMetrics queueMetrics;
+ private SharingPolicy policy;
+ private ReservationAgent agent;
+ private Planner replanner;
+
+ @Before
+ public void setUp() throws PlanningException {
+ resCalc = new DefaultResourceCalculator();
+ minAlloc = Resource.newInstance(1024, 1);
+ maxAlloc = Resource.newInstance(64 * 1024, 20);
+ totalCapacity = Resource.newInstance(100 * 1024, 100);
+
+ clock = mock(Clock.class);
+ queueMetrics = mock(QueueMetrics.class);
+ policy = mock(SharingPolicy.class);
+ replanner = mock(Planner.class);
+
+ when(clock.getTime()).thenReturn(1L);
+ }
+
+ @After
+ public void tearDown() {
+ resCalc = null;
+ minAlloc = null;
+ maxAlloc = null;
+ totalCapacity = null;
+
+ clock = null;
+ queueMetrics = null;
+ policy = null;
+ replanner = null;
+ }
+
+ @Test
+ public void testAddReservation() {
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, false);
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length, allocations, resCalc, minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getConsumptionForUser(user, start + i));
+ }
+ }
+
+ @Test
+ public void testAddEmptyReservation() {
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = {};
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations =
+ new HashMap<ReservationInterval, ReservationRequest>();
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length, allocations, resCalc, minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAddReservationAlreadyExists() {
+ // First add a reservation
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, false);
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length, allocations, resCalc, minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getConsumptionForUser(user, start + i));
+ }
+
+ // Try to add it again
+ try {
+ plan.addReservation(rAllocation);
+ Assert.fail("Add should fail as it already exists");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().endsWith("already exists"));
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ }
+
+ @Test
+ public void testUpdateReservation() {
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ // First add a reservation
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, false);
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length, allocations, resCalc, minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getConsumptionForUser(user, start + i));
+ }
+
+ // Now update it
+ start = 110;
+ int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
+ allocations = generateAllocation(start, updatedAlloc, true);
+ rDef =
+ createSimpleReservationDefinition(start, start + updatedAlloc.length,
+ updatedAlloc.length, allocations.values());
+ rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
+ try {
+ plan.updateReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ for (int i = 0; i < updatedAlloc.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ + i), plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ + i), plan.getConsumptionForUser(user, start + i));
+ }
+ }
+
+ @Test
+ public void testUpdateNonExistingReservation() {
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ // Try to update a reservation without adding
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, false);
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length, allocations, resCalc, minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.updateReservation(rAllocation);
+ Assert.fail("Update should fail as it does not exist in the plan");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNull(plan.getReservationById(reservationID));
+ }
+
+ @Test
+ public void testDeleteReservation() {
+ // First add a reservation
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, true);
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length, allocations, resCalc, minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+ plan.getConsumptionForUser(user, start + i));
+ }
+
+ // Now delete it
+ try {
+ plan.deleteReservation(reservationID);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNull(plan.getReservationById(reservationID));
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ plan.getConsumptionForUser(user, start + i));
+ }
+ }
+
+ @Test
+ public void testDeleteNonExistingReservation() {
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ // Try to delete a reservation without adding
+ Assert.assertNull(plan.getReservationById(reservationID));
+ try {
+ plan.deleteReservation(reservationID);
+ Assert.fail("Delete should fail as it does not exist in the plan");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNull(plan.getReservationById(reservationID));
+ }
+
+ @Test
+ public void testArchiveCompletedReservations() {
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true);
+ ReservationId reservationID1 =
+ ReservationSystemTestUtil.getNewReservationId();
+ // First add a reservation
+ int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Map<ReservationInterval, ReservationRequest> allocations1 =
+ generateAllocation(start, alloc1, false);
+ ReservationDefinition rDef1 =
+ createSimpleReservationDefinition(start, start + alloc1.length,
+ alloc1.length, allocations1.values());
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID1, rDef1, user,
+ planName, start, start + alloc1.length, allocations1, resCalc,
+ minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID1));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ doAssertions(plan, rAllocation);
+ for (int i = 0; i < alloc1.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+ plan.getConsumptionForUser(user, start + i));
+ }
+
+ // Now add another one
+ ReservationId reservationID2 =
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc2 = { 0, 5, 10, 5, 0 };
+ Map<ReservationInterval, ReservationRequest> allocations2 =
+ generateAllocation(start, alloc2, true);
+ ReservationDefinition rDef2 =
+ createSimpleReservationDefinition(start, start + alloc2.length,
+ alloc2.length, allocations2.values());
+ rAllocation =
+ new InMemoryReservationAllocation(reservationID2, rDef2, user,
+ planName, start, start + alloc2.length, allocations2, resCalc,
+ minAlloc);
+ Assert.assertNull(plan.getReservationById(reservationID2));
+ try {
+ plan.addReservation(rAllocation);
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan.getReservationById(reservationID2));
+ for (int i = 0; i < alloc2.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ + alloc2[i] + i), plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ + alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
+ }
+
+ // Now archive completed reservations
+ when(clock.getTime()).thenReturn(106L);
+ when(policy.getValidWindow()).thenReturn(1L);
+ try {
+ // will only remove 2nd reservation as only that has fallen out of the
+ // archival window
+ plan.archiveCompletedReservations(clock.getTime());
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan.getReservationById(reservationID1));
+ Assert.assertNull(plan.getReservationById(reservationID2));
+ for (int i = 0; i < alloc1.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+ plan.getConsumptionForUser(user, start + i));
+ }
+ when(clock.getTime()).thenReturn(107L);
+ try {
+ // will remove 1st reservation also as it has fallen out of the archival
+ // window
+ plan.archiveCompletedReservations(clock.getTime());
+ } catch (PlanningException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNull(plan.getReservationById(reservationID1));
+ for (int i = 0; i < alloc1.length; i++) {
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ plan.getConsumptionForUser(user, start + i));
+ }
+ }
+
+ private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
+ ReservationId reservationID = rAllocation.getReservationId();
+ Assert.assertNotNull(plan.getReservationById(reservationID));
+ Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
+ Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
+ Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
+ Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
+ Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
+ Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
+ Assert.assertEquals(resCalc, plan.getResourceCalculator());
+ Assert.assertEquals(planName, plan.getQueueName());
+ Assert.assertTrue(plan.getMoveOnExpiry());
+ }
+
+ private ReservationDefinition createSimpleReservationDefinition(long arrival,
+ long deadline, long duration, Collection<ReservationRequest> resources) {
+ // create a request with a single atomic ask
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(new ArrayList<ReservationRequest>(resources));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ rDef.setReservationRequests(reqs);
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ return rDef;
+ }
+
+ private Map<ReservationInterval, ReservationRequest> generateAllocation(
+ int startTime, int[] alloc, boolean isStep) {
+ Map<ReservationInterval, ReservationRequest> req =
+ new HashMap<ReservationInterval, ReservationRequest>();
+ int numContainers = 0;
+ for (int i = 0; i < alloc.length; i++) {
+ if (isStep) {
+ numContainers = alloc[i] + i;
+ } else {
+ numContainers = alloc[i];
+ }
+ ReservationRequest rr =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ (numContainers));
+ req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+ }
+ return req;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63250ef9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
new file mode 100644
index 0000000..f4c4581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
@@ -0,0 +1,206 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemoryReservationAllocation {
+
+ private String user = "yarn";
+ private String planName = "test-reservation";
+ private ResourceCalculator resCalc;
+ private Resource minAlloc;
+
+ private Random rand = new Random();
+
+ @Before
+ public void setUp() {
+ resCalc = new DefaultResourceCalculator();
+ minAlloc = Resource.newInstance(1, 1);
+ }
+
+ @After
+ public void tearDown() {
+ user = null;
+ planName = null;
+ resCalc = null;
+ minAlloc = null;
+ }
+
+ @Test
+ public void testBlocks() {
+ ReservationId reservationID =
+ ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length + 1,
+ alloc.length);
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, false, false);
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+ doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+ Assert.assertFalse(rAllocation.containsGangs());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ rAllocation.getResourcesAtTime(start + i));
+ }
+ }
+
+ @Test
+ public void testSteps() {
+ ReservationId reservationID =
+ ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length + 1,
+ alloc.length);
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, true, false);
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+ doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+ Assert.assertFalse(rAllocation.containsGangs());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+ rAllocation.getResourcesAtTime(start + i));
+ }
+ }
+
+ @Test
+ public void testSkyline() {
+ ReservationId reservationID =
+ ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+ int[] alloc = { 0, 5, 10, 10, 5, 0 };
+ int start = 100;
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length + 1,
+ alloc.length);
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, true, false);
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+ doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+ Assert.assertFalse(rAllocation.containsGangs());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+ rAllocation.getResourcesAtTime(start + i));
+ }
+ }
+
+ @Test
+ public void testZeroAlloaction() {
+ ReservationId reservationID =
+ ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+ int[] alloc = {};
+ long start = 0;
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length + 1,
+ alloc.length);
+ Map<ReservationInterval, ReservationRequest> allocations =
+ new HashMap<ReservationInterval, ReservationRequest>();
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+ doAssertions(rAllocation, reservationID, rDef, allocations, (int) start,
+ alloc);
+ Assert.assertFalse(rAllocation.containsGangs());
+ }
+
+ @Test
+ public void testGangAlloaction() {
+ ReservationId reservationID =
+ ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ ReservationDefinition rDef =
+ createSimpleReservationDefinition(start, start + alloc.length + 1,
+ alloc.length);
+ Map<ReservationInterval, ReservationRequest> allocations =
+ generateAllocation(start, alloc, false, true);
+ ReservationAllocation rAllocation =
+ new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+ start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+ doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+ Assert.assertTrue(rAllocation.containsGangs());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ rAllocation.getResourcesAtTime(start + i));
+ }
+ }
+
+ private void doAssertions(ReservationAllocation rAllocation,
+ ReservationId reservationID, ReservationDefinition rDef,
+ Map<ReservationInterval, ReservationRequest> allocations, int start,
+ int[] alloc) {
+ Assert.assertEquals(reservationID, rAllocation.getReservationId());
+ Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
+ Assert.assertEquals(allocations, rAllocation.getAllocationRequests());
+ Assert.assertEquals(user, rAllocation.getUser());
+ Assert.assertEquals(planName, rAllocation.getPlanName());
+ Assert.assertEquals(start, rAllocation.getStartTime());
+ Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
+ }
+
+ private ReservationDefinition createSimpleReservationDefinition(long arrival,
+ long deadline, long duration) {
+ // create a request with a single atomic ask
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
+ duration);
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ rDef.setReservationRequests(reqs);
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ return rDef;
+ }
+
+ private Map<ReservationInterval, ReservationRequest> generateAllocation(
+ int startTime, int[] alloc, boolean isStep, boolean isGang) {
+ Map<ReservationInterval, ReservationRequest> req =
+ new HashMap<ReservationInterval, ReservationRequest>();
+ int numContainers = 0;
+ for (int i = 0; i < alloc.length; i++) {
+ if (isStep) {
+ numContainers = alloc[i] + i;
+ } else {
+ numContainers = alloc[i];
+ }
+ ReservationRequest rr =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ (numContainers));
+ if (isGang) {
+ rr.setConcurrency(numContainers);
+ }
+ req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+ }
+ return req;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63250ef9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
new file mode 100644
index 0000000..ab0de6b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -0,0 +1,169 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRLESparseResourceAllocation {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestRLESparseResourceAllocation.class);
+
+ @Test
+ public void testBlocks() {
+ ResourceCalculator resCalc = new DefaultResourceCalculator();
+ Resource minAlloc = Resource.newInstance(1, 1);
+
+ RLESparseResourceAllocation rleSparseVector =
+ new RLESparseResourceAllocation(resCalc, minAlloc);
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+ generateAllocation(start, alloc, false).entrySet();
+ for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+ }
+ LOG.info(rleSparseVector.toString());
+ Assert.assertFalse(rleSparseVector.isEmpty());
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(99));
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ rleSparseVector.getCapacityAtTime(start + i));
+ }
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+ for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+ }
+ LOG.info(rleSparseVector.toString());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + i));
+ }
+ Assert.assertTrue(rleSparseVector.isEmpty());
+ }
+
+ @Test
+ public void testSteps() {
+ ResourceCalculator resCalc = new DefaultResourceCalculator();
+ Resource minAlloc = Resource.newInstance(1, 1);
+
+ RLESparseResourceAllocation rleSparseVector =
+ new RLESparseResourceAllocation(resCalc, minAlloc);
+ int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ int start = 100;
+ Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+ generateAllocation(start, alloc, true).entrySet();
+ for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+ }
+ LOG.info(rleSparseVector.toString());
+ Assert.assertFalse(rleSparseVector.isEmpty());
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(99));
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+ rleSparseVector.getCapacityAtTime(start + i));
+ }
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+ for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+ }
+ LOG.info(rleSparseVector.toString());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + i));
+ }
+ Assert.assertTrue(rleSparseVector.isEmpty());
+ }
+
+ @Test
+ public void testSkyline() {
+ ResourceCalculator resCalc = new DefaultResourceCalculator();
+ Resource minAlloc = Resource.newInstance(1, 1);
+
+ RLESparseResourceAllocation rleSparseVector =
+ new RLESparseResourceAllocation(resCalc, minAlloc);
+ int[] alloc = { 0, 5, 10, 10, 5, 0 };
+ int start = 100;
+ Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+ generateAllocation(start, alloc, true).entrySet();
+ for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+ }
+ LOG.info(rleSparseVector.toString());
+ Assert.assertFalse(rleSparseVector.isEmpty());
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(99));
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+ rleSparseVector.getCapacityAtTime(start + i));
+ }
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+ for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+ }
+ LOG.info(rleSparseVector.toString());
+ for (int i = 0; i < alloc.length; i++) {
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(start + i));
+ }
+ Assert.assertTrue(rleSparseVector.isEmpty());
+ }
+
+ @Test
+ public void testZeroAlloaction() {
+ ResourceCalculator resCalc = new DefaultResourceCalculator();
+ Resource minAlloc = Resource.newInstance(1, 1);
+ RLESparseResourceAllocation rleSparseVector =
+ new RLESparseResourceAllocation(resCalc, minAlloc);
+ rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
+ ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
+ LOG.info(rleSparseVector.toString());
+ Assert.assertEquals(Resource.newInstance(0, 0),
+ rleSparseVector.getCapacityAtTime(new Random().nextLong()));
+ Assert.assertTrue(rleSparseVector.isEmpty());
+ }
+
+ private Map<ReservationInterval, ReservationRequest> generateAllocation(
+ int startTime, int[] alloc, boolean isStep) {
+ Map<ReservationInterval, ReservationRequest> req =
+ new HashMap<ReservationInterval, ReservationRequest>();
+ int numContainers = 0;
+ for (int i = 0; i < alloc.length; i++) {
+ if (isStep) {
+ numContainers = alloc[i] + i;
+ } else {
+ numContainers = alloc[i];
+ }
+ req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ (numContainers)));
+ }
+ return req;
+ }
+
+}