You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2017/09/05 21:58:55 UTC
[1/4] hadoop git commit: Revert "Plan/ResourceAllocation data
structure enhancements required to support recurring reservations in
ReservationSystem."
Repository: hadoop
Updated Branches:
refs/heads/branch-2 f382f6625 -> 3a2f3e78f
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 03569d4..bc98e2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planne
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
@@ -48,10 +47,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-/**
- * Testing the class {@link InMemoryPlan}.
- */
-@SuppressWarnings("checkstyle:nowhitespaceafter")
public class TestInMemoryPlan {
private String user = "yarn";
@@ -67,7 +62,6 @@ public class TestInMemoryPlan {
private ReservationAgent agent;
private Planner replanner;
private RMContext context;
- private long maxPeriodicity;
@Before
public void setUp() throws PlanningException {
@@ -78,7 +72,7 @@ public class TestInMemoryPlan {
clock = mock(Clock.class);
queueMetrics = mock(QueueMetrics.class);
- policy = new NoOverCommitPolicy();
+ policy = mock(SharingPolicy.class);
replanner = mock(Planner.class);
when(clock.getTime()).thenReturn(1L);
@@ -101,41 +95,15 @@ public class TestInMemoryPlan {
@Test
public void testAddReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
+ ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
- Assert.assertNull(plan.getReservationById(reservationID));
- try {
- plan.addReservation(rAllocation, false);
- } catch (PlanningException e) {
- Assert.fail(e.getMessage());
- }
- doAssertions(plan, rAllocation);
- checkAllocation(plan, alloc, start, 0);
- }
-
- @Test
- public void testAddPeriodicReservation() throws PlanningException {
-
- maxPeriodicity = 100;
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, maxPeriodicity,
- context, new UTCClock());
-
- ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 50 };
- int start = 10;
- long period = 20;
- ReservationAllocation rAllocation = createReservationAllocation(
- reservationID, start, alloc, String.valueOf(period));
- // use periodicity of 1hr
- rAllocation.setPeriodicity(period);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -143,54 +111,32 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
- checkAllocation(plan, alloc, start, period);
-
- RLESparseResourceAllocation available =
- plan.getAvailableResourceOverTime(user, reservationID, 150, 330, 50);
- System.out.println(available);
+ checkAllocation(plan, alloc, start);
}
- private void checkAllocation(Plan plan, int[] alloc, int start,
- long periodicity) {
- long end = start + alloc.length;
- if (periodicity > 0) {
- end = end + maxPeriodicity;
- }
+ private void checkAllocation(Plan plan, int[] alloc, int start) {
RLESparseResourceAllocation userCons =
- plan.getConsumptionForUserOverTime(user, start, end * 3);
+ plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
- // only one instance for non-periodic reservation
- if (periodicity <= 0) {
- Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
- plan.getTotalCommittedResources(start + i));
- Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
- userCons.getCapacityAtTime(start + i));
- } else {
- // periodic reservations should repeat
- long y = 0;
- Resource res = Resource.newInstance(1024 * (alloc[i]), (alloc[i]));
- while (y <= end * 2) {
- Assert.assertEquals("At time: " + start + i + y, res,
- plan.getTotalCommittedResources(start + i + y));
- Assert.assertEquals(" At time: " + (start + i + y), res,
- userCons.getCapacityAtTime(start + i + y));
- y = y + periodicity;
- }
- }
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+ userCons.getCapacityAtTime(start + i));
}
}
@Test
public void testAddEmptyReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
+ ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {};
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -202,14 +148,15 @@ public class TestInMemoryPlan {
@Test
public void testAddReservationAlreadyExists() {
// First add a reservation
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
+ ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -217,7 +164,7 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
- checkAllocation(plan, alloc, start, 0);
+ checkAllocation(plan, alloc, start);
// Try to add it again
try {
@@ -233,15 +180,16 @@ public class TestInMemoryPlan {
@Test
public void testUpdateReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// First add a reservation
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -262,8 +210,8 @@ public class TestInMemoryPlan {
// Now update it
start = 110;
int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
- rAllocation =
- createReservationAllocation(reservationID, start, updatedAlloc, true);
+ rAllocation = createReservationAllocation(reservationID, start,
+ updatedAlloc, true);
try {
plan.updateReservation(rAllocation);
} catch (PlanningException e) {
@@ -271,71 +219,32 @@ public class TestInMemoryPlan {
}
doAssertions(plan, rAllocation);
- userCons = plan.getConsumptionForUserOverTime(user, start,
- start + updatedAlloc.length);
+ userCons =
+ plan.getConsumptionForUserOverTime(user, start, start
+ + updatedAlloc.length);
for (int i = 0; i < updatedAlloc.length; i++) {
- Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
- updatedAlloc[i] + i), plan.getTotalCommittedResources(start + i));
- Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
- updatedAlloc[i] + i), userCons.getCapacityAtTime(start + i));
- }
- }
-
- @Test
- public void testUpdatePeriodicReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
- // First add a reservation
- ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 20 };
- int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
- // use periodicity of 1hr
- long period = 3600000;
- rAllocation.getReservationDefinition()
- .setRecurrenceExpression(String.valueOf(period));
- rAllocation.setPeriodicity(period);
- Assert.assertNull(plan.getReservationById(reservationID));
- try {
- plan.addReservation(rAllocation, false);
- } catch (PlanningException e) {
- Assert.fail(e.getMessage());
- }
- System.out.println(plan.toString());
- doAssertions(plan, rAllocation);
- checkAllocation(plan, alloc, start, period);
-
- // Now update it
- start = 110;
- int[] updatedAlloc = { 30, 40, 50 };
- rAllocation =
- createReservationAllocation(reservationID, start, updatedAlloc);
- rAllocation.getReservationDefinition()
- .setRecurrenceExpression(String.valueOf(period));
- rAllocation.setPeriodicity(period);
- try {
- plan.updateReservation(rAllocation);
- } catch (PlanningException e) {
- Assert.fail(e.getMessage());
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ + i), plan.getTotalCommittedResources(start + i));
+ Assert.assertEquals(
+ Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ + i), userCons.getCapacityAtTime(start + i));
}
- doAssertions(plan, rAllocation);
- checkAllocation(plan, updatedAlloc, start, period);
}
@Test
public void testUpdateNonExistingReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// Try to update a reservation without adding
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ createReservationAllocation(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.updateReservation(rAllocation);
@@ -351,14 +260,15 @@ public class TestInMemoryPlan {
@Test
public void testDeleteReservation() {
// First add a reservation
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc, true);
+ createReservationAllocation(reservationID, start, alloc, true);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -397,46 +307,10 @@ public class TestInMemoryPlan {
}
@Test
- public void testDeletePeriodicReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
- // First add a reservation
- ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 20 };
- int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
- // use periodicity of 1hr
- long period = 3600000;
- rAllocation.getReservationDefinition()
- .setRecurrenceExpression(String.valueOf(period));
- rAllocation.setPeriodicity(period);
- Assert.assertNull(plan.getReservationById(reservationID));
- try {
- plan.addReservation(rAllocation, false);
- } catch (PlanningException e) {
- Assert.fail(e.getMessage());
- }
- System.out.println(plan.toString());
- doAssertions(plan, rAllocation);
- checkAllocation(plan, alloc, start, period);
-
- // Now delete it
- try {
- plan.deleteReservation(reservationID);
- } catch (PlanningException e) {
- Assert.fail(e.getMessage());
- }
- Assert.assertNull(plan.getReservationById(reservationID));
- System.out.print(plan);
- checkAllocation(plan, new int[] { 0, 0 }, start, period);
- }
-
- @Test
public void testDeleteNonExistingReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// Try to delete a reservation without adding
@@ -454,9 +328,8 @@ public class TestInMemoryPlan {
@Test
public void testArchiveCompletedReservations() {
- SharingPolicy sharingPolicy = mock(SharingPolicy.class);
Plan plan =
- new InMemoryPlan(queueMetrics, sharingPolicy, agent, totalCapacity, 1L,
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID1 =
ReservationSystemTestUtil.getNewReservationId();
@@ -464,7 +337,7 @@ public class TestInMemoryPlan {
int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationAllocation rAllocation =
- createReservationAllocation(reservationID1, start, alloc1);
+ createReservationAllocation(reservationID1, start, alloc1);
Assert.assertNull(plan.getReservationById(reservationID1));
try {
plan.addReservation(rAllocation, false);
@@ -472,14 +345,15 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
- checkAllocation(plan, alloc1, start, 0);
+ checkAllocation(plan, alloc1, start);
+
// Now add another one
ReservationId reservationID2 =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc2 = { 0, 5, 10, 5, 0 };
rAllocation =
- createReservationAllocation(reservationID2, start, alloc2, true);
+ createReservationAllocation(reservationID2, start, alloc2, true);
Assert.assertNull(plan.getReservationById(reservationID2));
try {
plan.addReservation(rAllocation, false);
@@ -493,18 +367,16 @@ public class TestInMemoryPlan {
for (int i = 0; i < alloc2.length; i++) {
Assert.assertEquals(
- Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
- alloc1[i] + alloc2[i] + i),
- plan.getTotalCommittedResources(start + i));
+ Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ + alloc2[i] + i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
- Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
- alloc1[i] + alloc2[i] + i),
- userCons.getCapacityAtTime(start + i));
+ Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ + alloc2[i] + i), userCons.getCapacityAtTime(start + i));
}
// Now archive completed reservations
when(clock.getTime()).thenReturn(106L);
- when(sharingPolicy.getValidWindow()).thenReturn(1L);
+ when(policy.getValidWindow()).thenReturn(1L);
try {
// will only remove 2nd reservation as only that has fallen out of the
// archival window
@@ -514,7 +386,7 @@ public class TestInMemoryPlan {
}
Assert.assertNotNull(plan.getReservationById(reservationID1));
Assert.assertNull(plan.getReservationById(reservationID2));
- checkAllocation(plan, alloc1, start, 0);
+ checkAllocation(plan, alloc1, start);
when(clock.getTime()).thenReturn(107L);
try {
@@ -539,14 +411,15 @@ public class TestInMemoryPlan {
@Test
public void testGetReservationsById() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -556,30 +429,31 @@ public class TestInMemoryPlan {
// Verify that get by reservation id works.
Set<ReservationAllocation> rAllocations =
- plan.getReservations(reservationID, null, "");
+ plan.getReservations(reservationID, null, "");
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by reservation id works even when time range
// and user is invalid.
ReservationInterval interval = new ReservationInterval(0, 0);
rAllocations = plan.getReservations(reservationID, interval, "invalid");
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
}
@Test
public void testGetReservationsByInvalidId() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -589,22 +463,23 @@ public class TestInMemoryPlan {
// If reservationId is null, then nothing is returned.
ReservationId invalidReservationID =
- ReservationSystemTestUtil.getNewReservationId();
+ ReservationSystemTestUtil.getNewReservationId();
Set<ReservationAllocation> rAllocations =
- plan.getReservations(invalidReservationID, null, "");
+ plan.getReservations(invalidReservationID, null, "");
Assert.assertTrue(rAllocations.size() == 0);
}
@Test
public void testGetReservationsByTimeInterval() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -614,24 +489,23 @@ public class TestInMemoryPlan {
// Verify that get by time interval works if the selection interval
// completely overlaps with the allocation.
- ReservationInterval interval = new ReservationInterval(
- rAllocation.getStartTime(), rAllocation.getEndTime());
+ ReservationInterval interval = new ReservationInterval(rAllocation
+ .getStartTime(), rAllocation.getEndTime());
Set<ReservationAllocation> rAllocations =
- plan.getReservations(null, interval, "");
+ plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by time interval works if the selection interval
// falls within the allocation
long duration = rAllocation.getEndTime() - rAllocation.getStartTime();
- interval = new ReservationInterval(
- rAllocation.getStartTime() + duration * (long) 0.3,
- rAllocation.getEndTime() - duration * (long) 0.3);
+ interval = new ReservationInterval(rAllocation.getStartTime() + duration
+ * (long)0.3, rAllocation.getEndTime() - duration * (long)0.3);
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by time interval selects 1 allocation if the end
// time of the selection interval falls right at the start of the
@@ -639,13 +513,13 @@ public class TestInMemoryPlan {
interval = new ReservationInterval(0, rAllocation.getStartTime());
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by time interval selects no reservations if the start
// time of the selection interval falls right at the end of the allocation.
- interval =
- new ReservationInterval(rAllocation.getEndTime(), Long.MAX_VALUE);
+ interval = new ReservationInterval(rAllocation
+ .getEndTime(), Long.MAX_VALUE);
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 0);
@@ -658,14 +532,15 @@ public class TestInMemoryPlan {
@Test
public void testGetReservationsAtTime() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -674,22 +549,23 @@ public class TestInMemoryPlan {
}
Set<ReservationAllocation> rAllocations =
- plan.getReservationsAtTime(rAllocation.getStartTime());
+ plan.getReservationsAtTime(rAllocation.getStartTime());
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
}
@Test
public void testGetReservationsWithNoInput() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
- ReservationSystemTestUtil.getNewReservationId();
- int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ ReservationSystemTestUtil.getNewReservationId();
+ int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
- ReservationAllocation rAllocation =
- createReservationAllocation(reservationID, start, alloc);
+ ReservationAllocation rAllocation = createReservationAllocation
+ (reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@@ -700,21 +576,22 @@ public class TestInMemoryPlan {
// Verify that getReservations defaults to getting all reservations if no
// reservationID, time interval, and user is provided,
Set<ReservationAllocation> rAllocations =
- plan.getReservations(null, null, "");
+ plan.getReservations(null, null, "");
Assert.assertTrue(rAllocations.size() == 1);
- Assert.assertTrue(rAllocation
- .compareTo((ReservationAllocation) rAllocations.toArray()[0]) == 0);
+ Assert.assertTrue(rAllocation.compareTo(
+ (ReservationAllocation) rAllocations.toArray()[0]) == 0);
}
@Test
public void testGetReservationsWithNoReservation() {
- Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
- resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
+ Plan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+ resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
// Verify that get reservation returns no entries if no queries are made.
ReservationInterval interval = new ReservationInterval(0, Long.MAX_VALUE);
Set<ReservationAllocation> rAllocations =
- plan.getReservations(null, interval, "");
+ plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 0);
}
@@ -723,9 +600,7 @@ public class TestInMemoryPlan {
Assert.assertNotNull(plan.getReservationById(reservationID));
Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
- if (rAllocation.getPeriodicity() <= 0) {
- Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
- }
+ Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
@@ -735,8 +610,7 @@ public class TestInMemoryPlan {
}
private ReservationDefinition createSimpleReservationDefinition(long arrival,
- long deadline, long duration, Collection<ReservationRequest> resources,
- String recurrenceExpression) {
+ long deadline, long duration, Collection<ReservationRequest> resources) {
// create a request with a single atomic ask
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
@@ -745,7 +619,6 @@ public class TestInMemoryPlan {
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
- rDef.setRecurrenceExpression(recurrenceExpression);
return rDef;
}
@@ -760,43 +633,31 @@ public class TestInMemoryPlan {
} else {
numContainers = alloc[i];
}
- ReservationRequest rr = ReservationRequest
- .newInstance(Resource.newInstance(1024, 1), (numContainers));
+ ReservationRequest rr =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ (numContainers));
req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
}
return req;
}
- private ReservationAllocation createReservationAllocation(
- ReservationId reservationID, int start, int[] alloc) {
- return createReservationAllocation(reservationID, start, alloc, false, "0");
- }
-
- private ReservationAllocation createReservationAllocation(
- ReservationId reservationID, int start, int[] alloc, boolean isStep) {
- return createReservationAllocation(reservationID, start, alloc, isStep,
- "0");
- }
-
- private ReservationAllocation createReservationAllocation(
- ReservationId reservationID, int start, int[] alloc,
- String recurrenceExp) {
- return createReservationAllocation(reservationID, start, alloc, false,
- recurrenceExp);
+ private ReservationAllocation createReservationAllocation(ReservationId
+ reservationID, int start, int[] alloc) {
+ return createReservationAllocation(reservationID, start, alloc, false);
}
- private ReservationAllocation createReservationAllocation(
- ReservationId reservationID, int start, int[] alloc, boolean isStep,
- String recurrenceExp) {
+ private ReservationAllocation createReservationAllocation(ReservationId
+ reservationID, int start, int[] alloc, boolean isStep) {
Map<ReservationInterval, ReservationRequest> allocations =
- generateAllocation(start, alloc, isStep);
+ generateAllocation(start, alloc, isStep);
ReservationDefinition rDef =
- createSimpleReservationDefinition(start, start + alloc.length,
- alloc.length, allocations.values(), recurrenceExp);
+ createSimpleReservationDefinition(start, start + alloc.length,
+ alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs =
- ReservationSystemUtil.toResources(allocations);
+ ReservationSystemUtil.toResources(allocations);
return new InMemoryReservationAllocation(reservationID, rDef, user,
- planName, start, start + alloc.length, allocs, resCalc, minAlloc);
+ planName,
+ start, start + alloc.length, allocs, resCalc, minAlloc);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
index 457e2ee..554eb58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java
@@ -19,27 +19,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Testing the class {@link PeriodicRLESparseResourceAllocation}.
+ * Testing the class PeriodicRLESparseResourceAllocation.
*/
-@SuppressWarnings("checkstyle:nowhitespaceafter")
public class TestPeriodicRLESparseResourceAllocation {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestPeriodicRLESparseResourceAllocation.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestPeriodicRLESparseResourceAllocation.class);
@Test
public void testPeriodicCapacity() {
- int[] alloc = { 10, 7, 5, 2, 0 };
- long[] timeSteps = { 0L, 5L, 10L, 15L, 19L };
- RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
+ int[] alloc = {10, 7, 5, 2, 0};
+ long[] timeSteps = {0L, 5L, 10L, 15L, 19L};
+ RLESparseResourceAllocation rleSparseVector =
+ ReservationSystemTestUtil.generateRLESparseResourceAllocation(
+ alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 20L);
LOG.info(periodicVector.toString());
@@ -55,74 +54,43 @@ public class TestPeriodicRLESparseResourceAllocation {
@Test
public void testMaxPeriodicCapacity() {
- int[] alloc = { 2, 5, 7, 10, 3, 4, 6, 8 };
- long[] timeSteps = { 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L };
- RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
+ int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
+ long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
+ RLESparseResourceAllocation rleSparseVector =
+ ReservationSystemTestUtil.generateRLESparseResourceAllocation(
+ alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 8L);
LOG.info(periodicVector.toString());
- Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(0, 1),
+ Assert.assertEquals(
+ periodicVector.getMaximumPeriodicCapacity(0, 1),
Resource.newInstance(10, 10));
- Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(8, 2),
+ Assert.assertEquals(
+ periodicVector.getMaximumPeriodicCapacity(8, 2),
Resource.newInstance(7, 7));
- Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(16, 3),
+ Assert.assertEquals(
+ periodicVector.getMaximumPeriodicCapacity(16, 3),
Resource.newInstance(10, 10));
- Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(17, 4),
+ Assert.assertEquals(
+ periodicVector.getMaximumPeriodicCapacity(17, 4),
Resource.newInstance(5, 5));
- Assert.assertEquals(periodicVector.getMaximumPeriodicCapacity(32, 5),
+ Assert.assertEquals(
+ periodicVector.getMaximumPeriodicCapacity(32, 5),
Resource.newInstance(4, 4));
}
@Test
- public void testMixPeriodicAndNonPeriodic() throws PlanningException {
- int[] alloc = { 2, 5, 0 };
- long[] timeSteps = { 1L, 2L, 3L };
- RLESparseResourceAllocation tempPeriodic = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
- PeriodicRLESparseResourceAllocation periodic =
- new PeriodicRLESparseResourceAllocation(tempPeriodic, 10L);
-
- int[] alloc2 = { 10, 10, 0 };
- long[] timeSteps2 = { 12L, 13L, 14L };
- RLESparseResourceAllocation nonPeriodic = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc2, timeSteps2);
-
- RLESparseResourceAllocation merged =
- RLESparseResourceAllocation.merge(nonPeriodic.getResourceCalculator(),
- Resource.newInstance(100 * 1024, 100), periodic, nonPeriodic,
- RLESparseResourceAllocation.RLEOperator.add, 2, 25);
-
- Assert.assertEquals(Resource.newInstance(5, 5),
- merged.getCapacityAtTime(2L));
- Assert.assertEquals(Resource.newInstance(0, 0),
- merged.getCapacityAtTime(3L));
- Assert.assertEquals(Resource.newInstance(2, 2),
- merged.getCapacityAtTime(11L));
- Assert.assertEquals(Resource.newInstance(15, 15),
- merged.getCapacityAtTime(12L));
- Assert.assertEquals(Resource.newInstance(10, 10),
- merged.getCapacityAtTime(13L));
- Assert.assertEquals(Resource.newInstance(0, 0),
- merged.getCapacityAtTime(14L));
- Assert.assertEquals(Resource.newInstance(2, 2),
- merged.getCapacityAtTime(21L));
- Assert.assertEquals(Resource.newInstance(5, 5),
- merged.getCapacityAtTime(22L));
- Assert.assertEquals(Resource.newInstance(0, 0),
- merged.getCapacityAtTime(23L));
- }
-
- @Test
public void testSetCapacityInInterval() {
- int[] alloc = { 2, 5, 0 };
- long[] timeSteps = { 1L, 2L, 3L };
- RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
+ int[] alloc = {2, 5, 0};
+ long[] timeSteps = {1L, 2L, 3L};
+ RLESparseResourceAllocation rleSparseVector =
+ ReservationSystemTestUtil.generateRLESparseResourceAllocation(
+ alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
ReservationInterval interval = new ReservationInterval(5L, 10L);
- periodicVector.addInterval(interval, Resource.newInstance(8, 8));
+ periodicVector.addInterval(
+ interval, Resource.newInstance(8, 8));
Assert.assertEquals(Resource.newInstance(8, 8),
periodicVector.getCapacityAtTime(5L));
Assert.assertEquals(Resource.newInstance(8, 8),
@@ -131,20 +99,21 @@ public class TestPeriodicRLESparseResourceAllocation {
periodicVector.getCapacityAtTime(10L));
Assert.assertEquals(Resource.newInstance(0, 0),
periodicVector.getCapacityAtTime(0L));
- // Assert.assertFalse(periodicVector.addInterval(
- // new ReservationInterval(7L, 12L), Resource.newInstance(8, 8)));
+ Assert.assertFalse(periodicVector.addInterval(
+ new ReservationInterval(7L, 12L), Resource.newInstance(8, 8)));
}
public void testRemoveInterval() {
- int[] alloc = { 2, 5, 3, 4, 0 };
- long[] timeSteps = { 1L, 3L, 5L, 7L, 9L };
- RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
+ int[] alloc = {2, 5, 3, 4, 0};
+ long[] timeSteps = {1L, 3L, 5L, 7L, 9L};
+ RLESparseResourceAllocation rleSparseVector =
+ ReservationSystemTestUtil.generateRLESparseResourceAllocation(
+ alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
ReservationInterval interval = new ReservationInterval(3L, 7L);
- Assert.assertTrue(
- periodicVector.removeInterval(interval, Resource.newInstance(3, 3)));
+ Assert.assertTrue(periodicVector.removeInterval(
+ interval, Resource.newInstance(3, 3)));
Assert.assertEquals(Resource.newInstance(2, 2),
periodicVector.getCapacityAtTime(1L));
Assert.assertEquals(Resource.newInstance(2, 2),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index 0027ceb..bfe46e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -40,14 +40,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Testing the class {@link RLESparseResourceAllocation}.
- */
-@SuppressWarnings("checkstyle:nowhitespaceafter")
public class TestRLESparseResourceAllocation {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestRLESparseResourceAllocation.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestRLESparseResourceAllocation.class);
@Test
public void testMergeAdd() throws PlanningException {
@@ -200,8 +196,7 @@ public class TestRLESparseResourceAllocation {
// Expected!
}
- // Testing that the subtractTestNonNegative detects problems even if only
- // one
+ // Testing that the subtractTestNonNegative detects problems even if only one
// of the resource dimensions is "<0"
a.put(10L, Resource.newInstance(10, 5));
b.put(11L, Resource.newInstance(5, 6));
@@ -291,8 +286,9 @@ public class TestRLESparseResourceAllocation {
public void testRangeOverlapping() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
- RLESparseResourceAllocation r = new RLESparseResourceAllocation(resCalc);
- int[] alloc = { 10, 10, 10, 10, 10, 10 };
+ RLESparseResourceAllocation r =
+ new RLESparseResourceAllocation(resCalc);
+ int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
Set<Entry<ReservationInterval, Resource>> inputs =
generateAllocation(start, alloc, false).entrySet();
@@ -303,9 +299,9 @@ public class TestRLESparseResourceAllocation {
long d = r.getLatestNonNullTime();
// tries to trigger "out-of-range" bug
- r = r.getRangeOverlapping(s, d);
- r = r.getRangeOverlapping(s - 1, d - 1);
- r = r.getRangeOverlapping(s + 1, d + 1);
+ r = r.getRangeOverlapping(s, d);
+ r = r.getRangeOverlapping(s-1, d-1);
+ r = r.getRangeOverlapping(s+1, d+1);
}
@Test
@@ -374,29 +370,25 @@ public class TestRLESparseResourceAllocation {
// Current bug prevents this to pass. The RLESparseResourceAllocation
// does not handle removal of "partial"
// allocations correctly.
- Assert.assertEquals(102400,
- rleSparseVector.getCapacityAtTime(10).getMemorySize());
- Assert.assertEquals(0,
- rleSparseVector.getCapacityAtTime(13).getMemorySize());
- Assert.assertEquals(0,
- rleSparseVector.getCapacityAtTime(19).getMemorySize());
- Assert.assertEquals(102400,
- rleSparseVector.getCapacityAtTime(21).getMemorySize());
- Assert.assertEquals(2 * 102400,
- rleSparseVector.getCapacityAtTime(26).getMemorySize());
+ Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(10)
+ .getMemorySize());
+ Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemorySize());
+ Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(19).getMemorySize());
+ Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(21)
+ .getMemorySize());
+ Assert.assertEquals(2 * 102400, rleSparseVector.getCapacityAtTime(26)
+ .getMemorySize());
ReservationInterval riRemove2 = new ReservationInterval(9, 13);
rleSparseVector.removeInterval(riRemove2, rr);
LOG.info(rleSparseVector.toString());
- Assert.assertEquals(0,
- rleSparseVector.getCapacityAtTime(11).getMemorySize());
- Assert.assertEquals(-102400,
- rleSparseVector.getCapacityAtTime(9).getMemorySize());
- Assert.assertEquals(0,
- rleSparseVector.getCapacityAtTime(13).getMemorySize());
- Assert.assertEquals(102400,
- rleSparseVector.getCapacityAtTime(20).getMemorySize());
+ Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(11).getMemorySize());
+ Assert.assertEquals(-102400, rleSparseVector.getCapacityAtTime(9)
+ .getMemorySize());
+ Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemorySize());
+ Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(20)
+ .getMemorySize());
}
@@ -508,8 +500,7 @@ public class TestRLESparseResourceAllocation {
}
mapAllocations = rleSparseVector.toIntervalMap();
Assert.assertTrue(mapAllocations.size() == 5);
- for (Entry<ReservationInterval, Resource> entry : mapAllocations
- .entrySet()) {
+ for (Entry<ReservationInterval, Resource> entry : mapAllocations.entrySet()) {
ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue();
if (interval.getStartTime() == 101L) {
@@ -535,46 +526,59 @@ public class TestRLESparseResourceAllocation {
@Test
public void testMaxPeriodicCapacity() {
- long[] timeSteps = { 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L };
- int[] alloc = { 2, 5, 7, 10, 3, 4, 6, 8 };
- RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
+ long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
+ int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
+ RLESparseResourceAllocation rleSparseVector =
+ ReservationSystemTestUtil.generateRLESparseResourceAllocation(
+ alloc, timeSteps);
LOG.info(rleSparseVector.toString());
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 1),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(0, 1),
Resource.newInstance(10, 10));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 2),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(0, 2),
Resource.newInstance(7, 7));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 3),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(0, 3),
Resource.newInstance(10, 10));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 4),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(0, 4),
Resource.newInstance(3, 3));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 5),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(0, 5),
Resource.newInstance(4, 4));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(0, 5),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(0, 5),
Resource.newInstance(4, 4));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(7, 5),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(7, 5),
Resource.newInstance(8, 8));
- Assert.assertEquals(rleSparseVector.getMaximumPeriodicCapacity(10, 3),
+ Assert.assertEquals(
+ rleSparseVector.getMaximumPeriodicCapacity(10, 3),
Resource.newInstance(0, 0));
}
@Test
public void testGetMinimumCapacityInInterval() {
- long[] timeSteps = { 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L };
- int[] alloc = { 2, 5, 7, 10, 3, 4, 0, 8 };
- RLESparseResourceAllocation rleSparseVector = ReservationSystemTestUtil
- .generateRLESparseResourceAllocation(alloc, timeSteps);
+ long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
+ int[] alloc = {2, 5, 7, 10, 3, 4, 0, 8};
+ RLESparseResourceAllocation rleSparseVector =
+ ReservationSystemTestUtil.generateRLESparseResourceAllocation(
+ alloc, timeSteps);
LOG.info(rleSparseVector.toString());
- Assert.assertEquals(rleSparseVector.getMinimumCapacityInInterval(
- new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5));
- Assert.assertEquals(rleSparseVector.getMinimumCapacityInInterval(
- new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3));
- Assert.assertEquals(rleSparseVector.getMinimumCapacityInInterval(
- new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0));
+ Assert.assertEquals(
+ rleSparseVector.getMinimumCapacityInInterval(
+ new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5));
+ Assert.assertEquals(
+ rleSparseVector.getMinimumCapacityInInterval(
+ new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3));
+ Assert.assertEquals(
+ rleSparseVector.getMinimumCapacityInInterval(
+ new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0));
}
- private void setupArrays(TreeMap<Long, Resource> a,
- TreeMap<Long, Resource> b) {
+ private void setupArrays(
+ TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
a.put(10L, Resource.newInstance(5, 5));
a.put(20L, Resource.newInstance(10, 10));
a.put(30L, Resource.newInstance(15, 15));
@@ -616,8 +620,8 @@ public class TestRLESparseResourceAllocation {
numContainers = alloc[i];
}
req.put(new ReservationInterval(startTime + i, startTime + i + 1),
- ReservationSystemUtil.toResource(ReservationRequest
- .newInstance(Resource.newInstance(1024, 1), (numContainers))));
+ ReservationSystemUtil.toResource(ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), (numContainers))));
}
return req;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
index ddd290d..c4f94c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
@@ -79,10 +78,9 @@ public class TestSimpleCapacityReplanner {
enf.init("blah", conf);
// Initialize the plan with more resources
- InMemoryPlan plan = new InMemoryPlan(queueMetrics, policy, agent,
- clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", enf, true,
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
- context, clock);
+ InMemoryPlan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", enf, true, context, clock);
// add reservation filling the plan (separating them 1ms, so we are sure
// s2 follows s1 on acceptance
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/4] hadoop git commit: Revert "Plan/ResourceAllocation data
structure enhancements required to support recurring reservations in
ReservationSystem."
Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 100d38c..658387b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.Collections;
+import java.io.IOException;
+import java.io.StringWriter;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.gson.stream.JsonWriter;
+
/**
* This is a run length encoded sparse data structure that maintains resource
* allocations over time.
@@ -41,14 +44,12 @@ public class RLESparseResourceAllocation {
private static final int THRESHOLD = 100;
private static final Resource ZERO_RESOURCE = Resources.none();
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected NavigableMap<Long, Resource> cumulativeCapacity =
+ private NavigableMap<Long, Resource> cumulativeCapacity =
new TreeMap<Long, Resource>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected final Lock readLock = readWriteLock.readLock();
+ private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final ResourceCalculator resourceCalculator;
@@ -235,6 +236,34 @@ public class RLESparseResourceAllocation {
}
/**
+ * Returns the JSON string representation of the current resources allocated
+ * over time.
+ *
+ * @return the JSON string representation of the current resources allocated
+ * over time
+ */
+ public String toMemJSONString() {
+ StringWriter json = new StringWriter();
+ JsonWriter jsonWriter = new JsonWriter(json);
+ readLock.lock();
+ try {
+ jsonWriter.beginObject();
+ // jsonWriter.name("timestamp").value("resource");
+ for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+ jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
+ }
+ jsonWriter.endObject();
+ jsonWriter.close();
+ return json.toString();
+ } catch (IOException e) {
+ // This should not happen
+ return "";
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
* Returns the representation of the current resources allocated over time as
* an interval map (in the defined non-null range).
*
@@ -275,7 +304,7 @@ public class RLESparseResourceAllocation {
public NavigableMap<Long, Resource> getCumulative() {
readLock.lock();
try {
- return Collections.unmodifiableNavigableMap(cumulativeCapacity);
+ return cumulativeCapacity;
} finally {
readLock.unlock();
}
@@ -408,8 +437,8 @@ public class RLESparseResourceAllocation {
Resource val = Resources.negate(e.getValue());
// test for negative value and throws
if (operator == RLEOperator.subtractTestNonNegative
- && (Resources.fitsIn(val, ZERO_RESOURCE)
- && !Resources.equals(val, ZERO_RESOURCE))) {
+ && (Resources.fitsIn(val, ZERO_RESOURCE) &&
+ !Resources.equals(val, ZERO_RESOURCE))) {
throw new PlanningException(
"RLESparseResourceAllocation: merge failed as the "
+ "resulting RLESparseResourceAllocation would be negative");
@@ -475,29 +504,22 @@ public class RLESparseResourceAllocation {
}
- /**
- * Get a {@link RLESparseResourceAllocation} view of the {@link Resource}
- * allocations between the specified start and end times.
- *
- * @param start the time from which the {@link Resource} allocations are
- * required
- * @param end the time upto which the {@link Resource} allocations are
- * required
- * @return the overlapping allocations
- */
public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
readLock.lock();
try {
NavigableMap<Long, Resource> a = this.getCumulative();
+
if (a != null && !a.isEmpty()) {
// include the portion of previous entry that overlaps start
if (start > a.firstKey()) {
long previous = a.floorKey(start);
a = a.tailMap(previous, true);
}
+
if (end < a.lastKey()) {
a = a.headMap(end, true);
}
+
}
RLESparseResourceAllocation ret =
new RLESparseResourceAllocation(a, resourceCalculator);
@@ -505,33 +527,7 @@ public class RLESparseResourceAllocation {
} finally {
readLock.unlock();
}
- }
- /**
- * This method shifts all the timestamp of the {@link Resource} entries by the
- * specified "delta".
- *
- * @param delta the time by which to shift the {@link Resource} allocations
- */
- public void shift(long delta) {
- writeLock.lock();
- try {
- TreeMap<Long, Resource> newCum = new TreeMap<>();
- long start;
- for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
- if (delta > 0) {
- start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE
- : entry.getKey() + delta;
- } else {
- start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE
- : entry.getKey() + delta;
- }
- newCum.put(start, entry.getValue());
- }
- cumulativeCapacity = newCum;
- } finally {
- writeLock.unlock();
- }
}
/**
@@ -545,8 +541,8 @@ public class RLESparseResourceAllocation {
/**
* Get the maximum capacity across specified time instances. The search-space
* is specified using the starting value, tick, and the periodic interval for
- * search. Maximum resource allocation across tick, tick + period, tick + 2 *
- * period,..., tick + n * period .. is returned.
+ * search. Maximum resource allocation across tick, tick + period,
+ * tick + 2 * period,..., tick + n * period .. is returned.
*
* @param tick the starting time instance
* @param period interval at which capacity is evaluated
@@ -554,19 +550,14 @@ public class RLESparseResourceAllocation {
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxCapacity = ZERO_RESOURCE;
- readLock.lock();
- try {
- if (!cumulativeCapacity.isEmpty()) {
- Long lastKey = cumulativeCapacity.lastKey();
- for (long t = tick; t <= lastKey; t = t + period) {
- maxCapacity = Resources.componentwiseMax(maxCapacity,
- cumulativeCapacity.floorEntry(t).getValue());
- }
+ if (!cumulativeCapacity.isEmpty()) {
+ Long lastKey = cumulativeCapacity.lastKey();
+ for (long t = tick; t <= lastKey; t = t + period) {
+ maxCapacity = Resources.componentwiseMax(maxCapacity,
+ cumulativeCapacity.floorEntry(t).getValue());
}
- return maxCapacity;
- } finally {
- readLock.unlock();
}
+ return maxCapacity;
}
/**
@@ -576,17 +567,17 @@ public class RLESparseResourceAllocation {
* @return minimum resource allocation
*/
public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
- Resource minCapacity =
- Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ Resource minCapacity = Resource.newInstance(
+ Integer.MAX_VALUE, Integer.MAX_VALUE);
long start = interval.getStartTime();
long end = interval.getEndTime();
NavigableMap<Long, Resource> capacityRange =
- getRangeOverlapping(start, end).getCumulative();
+ this.getRangeOverlapping(start, end).getCumulative();
if (!capacityRange.isEmpty()) {
for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
if (entry.getValue() != null) {
- minCapacity =
- Resources.componentwiseMin(minCapacity, entry.getValue());
+ minCapacity = Resources.componentwiseMin(minCapacity,
+ entry.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index bb4a7fb..0da95ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -24,16 +24,14 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* A ReservationAllocation represents a concrete allocation of resources over
* time that satisfy a certain {@link ReservationDefinition}. This is used
* internally by a {@link Plan} to store information about how each of the
* accepted {@link ReservationDefinition} have been allocated.
*/
-public interface ReservationAllocation
- extends Comparable<ReservationAllocation> {
+public interface ReservationAllocation extends
+ Comparable<ReservationAllocation> {
/**
* Returns the unique identifier {@link ReservationId} that represents the
@@ -42,28 +40,28 @@ public interface ReservationAllocation
* @return reservationId the unique identifier {@link ReservationId} that
* represents the reservation
*/
- ReservationId getReservationId();
+ public ReservationId getReservationId();
/**
* Returns the original {@link ReservationDefinition} submitted by the client
*
* @return the {@link ReservationDefinition} submitted by the client
*/
- ReservationDefinition getReservationDefinition();
+ public ReservationDefinition getReservationDefinition();
/**
* Returns the time at which the reservation is activated.
*
* @return the time at which the reservation is activated
*/
- long getStartTime();
+ public long getStartTime();
/**
* Returns the time at which the reservation terminates.
*
* @return the time at which the reservation terminates
*/
- long getEndTime();
+ public long getEndTime();
/**
* Returns the map of resources requested against the time interval for which
@@ -72,28 +70,28 @@ public interface ReservationAllocation
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
*/
- Map<ReservationInterval, Resource> getAllocationRequests();
+ public Map<ReservationInterval, Resource> getAllocationRequests();
/**
* Return a string identifying the plan to which the reservation belongs
*
* @return the plan to which the reservation belongs
*/
- String getPlanName();
+ public String getPlanName();
/**
* Returns the user who requested the reservation
*
* @return the user who requested the reservation
*/
- String getUser();
+ public String getUser();
/**
* Returns whether the reservation has gang semantics or not
*
* @return true if there is a gang request, false otherwise
*/
- boolean containsGangs();
+ public boolean containsGangs();
/**
* Sets the time at which the reservation was accepted by the system
@@ -101,14 +99,14 @@ public interface ReservationAllocation
* @param acceptedAt the time at which the reservation was accepted by the
* system
*/
- void setAcceptanceTimestamp(long acceptedAt);
+ public void setAcceptanceTimestamp(long acceptedAt);
/**
* Returns the time at which the reservation was accepted by the system
*
* @return the time at which the reservation was accepted by the system
*/
- long getAcceptanceTime();
+ public long getAcceptanceTime();
/**
* Returns the capacity represented by cumulative resources reserved by the
@@ -118,42 +116,12 @@ public interface ReservationAllocation
* requested
* @return the resources reserved at the specified time
*/
- Resource getResourcesAtTime(long tick);
-
- /**
- * Return a RLE representation of used resources.
- *
- * @return a RLE encoding of resources allocated over time.
- */
- RLESparseResourceAllocation getResourcesOverTime();
-
+ public Resource getResourcesAtTime(long tick);
/**
* Return a RLE representation of used resources.
- *
- * @param start start of the time interval.
- * @param end end of the time interval.
* @return a RLE encoding of resources allocated over time.
*/
- RLESparseResourceAllocation getResourcesOverTime(long start, long end);
-
- /**
- * Get the periodicity of this reservation representing the time period of the
- * periodic job. Period is represented in milliseconds for periodic jobs.
- * Period is 0 for non-periodic jobs.
- *
- * @return periodicity of this reservation
- */
- long getPeriodicity();
-
- /**
- * Set the periodicity of this reservation representing the time period of the
- * periodic job. Period is represented in milliseconds for periodic jobs.
- * Period is 0 for non-periodic jobs.
- *
- * @param period periodicity of this reservation
- */
- @VisibleForTesting
- void setPeriodicity(long period);
+ public RLESparseResourceAllocation getResourcesOverTime();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
index a66d222..027d066 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
@@ -44,8 +44,6 @@ public class ReservationInputValidator {
/**
* Utility class to validate reservation requests.
- *
- * @param clock the {@link Clock} to use
*/
public ReservationInputValidator(Clock clock) {
this.clock = clock;
@@ -55,21 +53,22 @@ public class ReservationInputValidator {
ReservationId reservationId, String auditConstant) throws YarnException {
// check if the reservation id is valid
if (reservationId == null) {
- String message = "Missing reservation id."
- + " Please try again by specifying a reservation id.";
+ String message =
+ "Missing reservation id."
+ + " Please try again by specifying a reservation id.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
String queue = reservationSystem.getQueueForReservation(reservationId);
String nullQueueErrorMessage =
- "The specified reservation with ID: " + reservationId
- + " is unknown. Please try again with a valid reservation.";
+ "The specified reservation with ID: " + reservationId
+ + " is unknown. Please try again with a valid reservation.";
String nullPlanErrorMessage = "The specified reservation: " + reservationId
- + " is not associated with any valid plan."
- + " Please try again with a valid reservation.";
+ + " is not associated with any valid plan."
+ + " Please try again with a valid reservation.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
- nullQueueErrorMessage, nullPlanErrorMessage);
+ nullQueueErrorMessage, nullPlanErrorMessage);
}
private void validateReservationDefinition(ReservationId reservationId,
@@ -78,15 +77,17 @@ public class ReservationInputValidator {
String message = "";
// check if deadline is in the past
if (contract == null) {
- message = "Missing reservation definition."
- + " Please try again by specifying a reservation definition.";
+ message =
+ "Missing reservation definition."
+ + " Please try again by specifying a reservation definition.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
if (contract.getDeadline() <= clock.getTime()) {
- message = "The specified deadline: " + contract.getDeadline()
- + " is the past. Please try again with deadline in the future.";
+ message =
+ "The specified deadline: " + contract.getDeadline()
+ + " is the past. Please try again with deadline in the future.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -94,16 +95,18 @@ public class ReservationInputValidator {
// Check if at least one RR has been specified
ReservationRequests resReqs = contract.getReservationRequests();
if (resReqs == null) {
- message = "No resources have been specified to reserve."
- + "Please try again by specifying the resources to reserve.";
+ message =
+ "No resources have been specified to reserve."
+ + "Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
List<ReservationRequest> resReq = resReqs.getReservationResources();
if (resReq == null || resReq.isEmpty()) {
- message = "No resources have been specified to reserve."
- + " Please try again by specifying the resources to reserve.";
+ message =
+ "No resources have been specified to reserve."
+ + " Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -120,18 +123,22 @@ public class ReservationInputValidator {
} else {
minDuration += rr.getDuration();
}
- maxGangSize = Resources.max(plan.getResourceCalculator(),
- plan.getTotalCapacity(), maxGangSize,
- Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+ maxGangSize =
+ Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
+ maxGangSize,
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()));
}
// verify the allocation is possible (skip for ANY)
long duration = contract.getDeadline() - contract.getArrival();
- if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) {
- message = "The time difference (" + (duration) + ") between arrival ("
- + contract.getArrival() + ") " + "and deadline ("
- + contract.getDeadline() + ") must "
- + " be greater or equal to the minimum resource duration ("
- + minDuration + ")";
+ if (duration < minDuration
+ && type != ReservationRequestInterpreter.R_ANY) {
+ message =
+ "The time difference ("
+ + (duration)
+ + ") between arrival (" + contract.getArrival() + ") "
+ + "and deadline (" + contract.getDeadline() + ") must "
+ + " be greater or equal to the minimum resource duration ("
+ + minDuration + ")";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -141,9 +148,10 @@ public class ReservationInputValidator {
if (Resources.greaterThan(plan.getResourceCalculator(),
plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
&& type != ReservationRequestInterpreter.R_ANY) {
- message = "The size of the largest gang in the reservation definition ("
- + maxGangSize + ") exceed the capacity available ("
- + plan.getTotalCapacity() + " )";
+ message =
+ "The size of the largest gang in the reservation definition ("
+ + maxGangSize + ") exceed the capacity available ("
+ + plan.getTotalCapacity() + " )";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -171,32 +179,32 @@ public class ReservationInputValidator {
}
}
- private Plan getPlanFromQueue(ReservationSystem reservationSystem,
- String queue, String auditConstant) throws YarnException {
+ private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
+ queue, String auditConstant) throws YarnException {
String nullQueueErrorMessage = "The queue is not specified."
- + " Please try again with a valid reservable queue.";
+ + " Please try again with a valid reservable queue.";
String nullPlanErrorMessage = "The specified queue: " + queue
- + " is not managed by reservation system."
- + " Please try again with a valid reservable queue.";
+ + " is not managed by reservation system."
+ + " Please try again with a valid reservable queue.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
- nullQueueErrorMessage, nullPlanErrorMessage);
+ nullQueueErrorMessage, nullPlanErrorMessage);
}
- private Plan getPlanFromQueue(ReservationSystem reservationSystem,
- String queue, String auditConstant, String nullQueueErrorMessage,
- String nullPlanErrorMessage) throws YarnException {
+ private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
+ queue, String auditConstant, String nullQueueErrorMessage,
+ String nullPlanErrorMessage) throws YarnException {
if (queue == null || queue.isEmpty()) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
- "validate reservation input", "ClientRMService",
- nullQueueErrorMessage);
+ "validate reservation input", "ClientRMService",
+ nullQueueErrorMessage);
throw RPCUtil.getRemoteException(nullQueueErrorMessage);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queue);
if (plan == null) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
- "validate reservation input", "ClientRMService",
- nullPlanErrorMessage);
+ "validate reservation input", "ClientRMService",
+ nullPlanErrorMessage);
throw RPCUtil.getRemoteException(nullPlanErrorMessage);
}
return plan;
@@ -214,21 +222,22 @@ public class ReservationInputValidator {
* @param reservationId the {@link ReservationId} associated with the current
* request
* @return the {@link Plan} to submit the request to
- * @throws YarnException if validation fails
+ * @throws YarnException
*/
public Plan validateReservationSubmissionRequest(
- ReservationSystem reservationSystem, ReservationSubmissionRequest request,
- ReservationId reservationId) throws YarnException {
+ ReservationSystem reservationSystem,
+ ReservationSubmissionRequest request, ReservationId reservationId)
+ throws YarnException {
String message;
if (reservationId == null) {
- message = "Reservation id cannot be null. Please try again specifying "
- + " a valid reservation id by creating a new reservation id.";
+ message = "Reservation id cannot be null. Please try again " +
+ "specifying a valid reservation id by creating a new reservation id.";
throw RPCUtil.getRemoteException(message);
}
// Check if it is a managed queue
String queue = request.getQueue();
Plan plan = getPlanFromQueue(reservationSystem, queue,
- AuditConstants.SUBMIT_RESERVATION_REQUEST);
+ AuditConstants.SUBMIT_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
@@ -246,14 +255,15 @@ public class ReservationInputValidator {
* @param request the {@link ReservationUpdateRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
- * @throws YarnException if validation fails
+ * @throws YarnException
*/
public Plan validateReservationUpdateRequest(
ReservationSystem reservationSystem, ReservationUpdateRequest request)
throws YarnException {
ReservationId reservationId = request.getReservationId();
- Plan plan = validateReservation(reservationSystem, reservationId,
- AuditConstants.UPDATE_RESERVATION_REQUEST);
+ Plan plan =
+ validateReservation(reservationSystem, reservationId,
+ AuditConstants.UPDATE_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.UPDATE_RESERVATION_REQUEST);
@@ -268,26 +278,28 @@ public class ReservationInputValidator {
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationListRequest} defining search
- * parameters for reservations in the {@link ReservationSystem} that
- * is being validated against.
+ * parameters for reservations in the {@link ReservationSystem}
+ * that is being validated against.
* @return the {@link Plan} to list reservations of.
- * @throws YarnException if validation fails
+ * @throws YarnException
*/
public Plan validateReservationListRequest(
- ReservationSystem reservationSystem, ReservationListRequest request)
+ ReservationSystem reservationSystem,
+ ReservationListRequest request)
throws YarnException {
String queue = request.getQueue();
if (request.getEndTime() < request.getStartTime()) {
- String errorMessage = "The specified end time must be greater than "
- + "the specified start time.";
+ String errorMessage = "The specified end time must be greater than " +
+ "the specified start time.";
RMAuditLogger.logFailure("UNKNOWN",
- AuditConstants.LIST_RESERVATION_REQUEST,
- "validate list reservation input", "ClientRMService", errorMessage);
+ AuditConstants.LIST_RESERVATION_REQUEST,
+ "validate list reservation input", "ClientRMService",
+ errorMessage);
throw RPCUtil.getRemoteException(errorMessage);
}
// Check if it is a managed queue
return getPlanFromQueue(reservationSystem, queue,
- AuditConstants.LIST_RESERVATION_REQUEST);
+ AuditConstants.LIST_RESERVATION_REQUEST);
}
/**
@@ -300,7 +312,7 @@ public class ReservationInputValidator {
* @param request the {@link ReservationDeleteRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
- * @throws YarnException if validation fails
+ * @throws YarnException
*/
public Plan validateReservationDeleteRequest(
ReservationSystem reservationSystem, ReservationDeleteRequest request)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index a6c8fcf..8b62972 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.Map;
-
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -31,6 +29,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
+import java.util.Map;
+
/**
* This interface is the one implemented by any system that wants to support
* Reservations i.e. make {@code Resource} allocations in future. Implementors
@@ -57,7 +57,7 @@ public interface ReservationSystem extends Recoverable {
*
* @param conf configuration
* @param rmContext current context of the {@code ResourceManager}
- * @throws YarnException if initialization of the configured plan fails
+ * @throws YarnException
*/
void reinitialize(Configuration conf, RMContext rmContext)
throws YarnException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
index cbf0f38..e458055 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -38,7 +38,7 @@ public interface SharingPolicy {
* @param planQueuePath the name of the queue for this plan
* @param conf the system configuration
*/
- void init(String planQueuePath, ReservationSchedulerConfiguration conf);
+ public void init(String planQueuePath, ReservationSchedulerConfiguration conf);
/**
* This method runs the policy validation logic, and return true/false on
@@ -51,7 +51,7 @@ public interface SharingPolicy {
* @throws PlanningException if the policy is respected if we add this
* {@link ReservationAllocation} to the {@link Plan}
*/
- void validate(Plan plan, ReservationAllocation newAllocation)
+ public void validate(Plan plan, ReservationAllocation newAllocation)
throws PlanningException;
/**
@@ -68,13 +68,9 @@ public interface SharingPolicy {
* @param start the start time for the range we are querying
* @param end the end time for the range we are querying
* @param oldId (optional) the id of a reservation being updated
- *
- * @return the available resources expressed as a
- * {@link RLESparseResourceAllocation}
- *
* @throws PlanningException throws if the request is not valid
*/
- RLESparseResourceAllocation availableResources(
+ public RLESparseResourceAllocation availableResources(
RLESparseResourceAllocation available, Plan plan, String user,
ReservationId oldId, long start, long end) throws PlanningException;
@@ -86,6 +82,7 @@ public interface SharingPolicy {
*
* @return validWindow the window of validity considered by the policy.
*/
- long getValidWindow();
+ public long getValidWindow();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
index af0e712..abac6ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -34,7 +34,7 @@ public interface Planner {
*
* @param plan the {@link Plan} to replan
* @param contracts the list of reservation requests
- * @throws PlanningException if operation is unsuccessful
+ * @throws PlanningException
*/
public void plan(Plan plan, List<ReservationDefinition> contracts)
throws PlanningException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index bbbf0d6..199bfa5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -50,7 +50,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
* @return whether the allocateUser function was successful or not
*
* @throws PlanningException if the session cannot be fitted into the plan
- * @throws ContractValidationException if validation fails
+ * @throws ContractValidationException
*/
protected boolean allocateUser(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index 8934b0f..ec6d9c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -50,7 +50,7 @@ public interface StageAllocator {
*
* @return The computed allocation (or null if the stage could not be
* allocated)
- * @throws PlanningException if operation is unsuccessful
+ * @throws PlanningException
*/
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index d107487..da04336 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
RLESparseResourceAllocation netAvailable =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
- stageDeadline, 0);
+ stageDeadline);
netAvailable =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index ae7d91a..ec83e02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -83,8 +83,9 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
// get available resources from plan
- RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
- user, oldId, stageEarliestStart, stageDeadline, 0);
+ RLESparseResourceAllocation netRLERes =
+ plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
+ stageDeadline);
// remove plan modifications
netRLERes =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index c014549..e45f58c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -77,8 +77,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity();
- RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
- user, oldId, stageArrival, stageDeadline, 0);
+ RLESparseResourceAllocation netRLERes = plan
+ .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
long step = plan.getStep();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 5337e06..e99842e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -19,10 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anySetOf;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import java.io.FileWriter;
import java.io.IOException;
@@ -79,8 +76,7 @@ public class ReservationSystemTestUtil {
String reservationQ, long timeWindow, float instConstraint,
float avgConstraint) {
- ReservationSchedulerConfiguration realConf =
- new CapacitySchedulerConfiguration();
+ ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
ReservationSchedulerConfiguration conf = spy(realConf);
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
when(conf.getInstantaneousMaxCapacity(reservationQ))
@@ -172,6 +168,7 @@ public class ReservationSystemTestUtil {
scheduler.start();
scheduler.reinitialize(conf, rmContext);
+
Resource resource =
ReservationSystemTestUtil.calculateClusterResource(numContainers);
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
@@ -187,16 +184,10 @@ public class ReservationSystemTestUtil {
public static ReservationDefinition createSimpleReservationDefinition(
long arrival, long deadline, long duration, int parallelism) {
- return createSimpleReservationDefinition(arrival, deadline, duration,
- parallelism, null);
- }
-
- public static ReservationDefinition createSimpleReservationDefinition(
- long arrival, long deadline, long duration, int parallelism,
- String recurrenceExpression) {
// create a request with a single atomic ask
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), parallelism, parallelism, duration);
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ parallelism, parallelism, duration);
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
@@ -204,31 +195,32 @@ public class ReservationSystemTestUtil {
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
- if (recurrenceExpression != null) {
- rDef.setRecurrenceExpression(recurrenceExpression);
- }
return rDef;
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration) {
- return createSimpleReservationRequest(reservationId, numContainers, arrival,
- deadline, duration, Priority.UNDEFINED);
+ return createSimpleReservationRequest(reservationId, numContainers,
+ arrival, deadline, duration, Priority.UNDEFINED);
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration, Priority priority) {
// create a request with a single atomic ask
- ReservationRequest r = ReservationRequest
- .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
- ReservationRequests reqs = ReservationRequests.newInstance(
- Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
- ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
- deadline, reqs, "testClientRMService#reservation", "0", priority);
- ReservationSubmissionRequest request = ReservationSubmissionRequest
- .newInstance(rDef, reservationQ, reservationId);
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+ ReservationRequests reqs =
+ ReservationRequests.newInstance(Collections.singletonList(r),
+ ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef =
+ ReservationDefinition.newInstance(arrival, deadline, reqs,
+ "testClientRMService#reservation", "0", priority);
+ ReservationSubmissionRequest request =
+ ReservationSubmissionRequest.newInstance(rDef,
+ reservationQ, reservationId);
return request;
}
@@ -260,9 +252,9 @@ public class ReservationSystemTestUtil {
return cs;
}
- @SuppressWarnings("rawtypes")
- public static void initializeRMContext(int numContainers,
- AbstractYarnScheduler scheduler, RMContext mockRMContext) {
+ @SuppressWarnings("rawtypes") public static void initializeRMContext(
+ int numContainers, AbstractYarnScheduler scheduler,
+ RMContext mockRMContext) {
when(mockRMContext.getScheduler()).thenReturn(scheduler);
Resource r = calculateClusterResource(numContainers);
@@ -270,25 +262,26 @@ public class ReservationSystemTestUtil {
}
public static RMContext createRMContext(Configuration conf) {
- RMContext mockRmContext = Mockito.spy(new RMContextImpl(null, null, null,
- null, null, null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
+ RMContext mockRmContext = Mockito.spy(
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null));
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when(nlm.getQueueResource(any(String.class), anySetOf(String.class),
- any(Resource.class))).thenAnswer(new Answer<Resource>() {
- @Override
- public Resource answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- return (Resource) args[2];
- }
- });
+ any(Resource.class))).thenAnswer(new Answer<Resource>() {
+ @Override public Resource answer(InvocationOnMock invocation)
+ throws Throwable {
+ Object[] args = invocation.getArguments();
+ return (Resource) args[2];
+ }
+ });
when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
.thenAnswer(new Answer<Resource>() {
- @Override
- public Resource answer(InvocationOnMock invocation) throws Throwable {
+ @Override public Resource answer(InvocationOnMock invocation)
+ throws Throwable {
Object[] args = invocation.getArguments();
return (Resource) args[1];
}
@@ -311,8 +304,9 @@ public class ReservationSystemTestUtil {
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
- final String dedicated = CapacitySchedulerConfiguration.ROOT
- + CapacitySchedulerConfiguration.DOT + reservationQ;
+ final String dedicated =
+ CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT
+ + reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservable(dedicated, true);
@@ -411,55 +405,26 @@ public class ReservationSystemTestUtil {
public static Map<ReservationInterval, Resource> generateAllocation(
long startTime, long step, int[] alloc) {
- return generateAllocation(startTime, step, alloc, null);
- }
-
- public static Map<ReservationInterval, Resource> generateAllocation(
- long startTime, long step, int[] alloc, String recurrenceExpression) {
Map<ReservationInterval, Resource> req = new TreeMap<>();
-
- long period = 0;
- if (recurrenceExpression != null) {
- period = Long.parseLong(recurrenceExpression);
- }
-
- long rStart;
- long rEnd;
- for (int j = 0; j < 86400000; j += period) {
- for (int i = 0; i < alloc.length; i++) {
- rStart = (startTime + i * step) + j * period;
- rEnd = (startTime + (i + 1) * step) + j * period;
- if (period > 0) {
- rStart = rStart % period + j * period;
- rEnd = rEnd % period + j * period;
- if (rStart > rEnd) {
- // skip wrap-around entry
- continue;
- }
- }
-
- req.put(new ReservationInterval(rStart, rEnd),
- ReservationSystemUtil.toResource(ReservationRequest
- .newInstance(Resource.newInstance(1024, 1), alloc[i])));
-
- }
- // execute only once if non-periodic
- if (period == 0) {
- break;
- }
+ for (int i = 0; i < alloc.length; i++) {
+ req.put(new ReservationInterval(startTime + i * step,
+ startTime + (i + 1) * step), ReservationSystemUtil.toResource(
+ ReservationRequest
+ .newInstance(Resource.newInstance(1024, 1), alloc[i])));
}
return req;
}
- public static RLESparseResourceAllocation generateRLESparseResourceAllocation(
- int[] alloc, long[] timeSteps) {
+ public static RLESparseResourceAllocation
+ generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) {
TreeMap<Long, Resource> allocationsMap = new TreeMap<>();
for (int i = 0; i < alloc.length; i++) {
allocationsMap.put(timeSteps[i],
Resource.newInstance(alloc[i], alloc[i]));
}
- RLESparseResourceAllocation rleVector = new RLESparseResourceAllocation(
- allocationsMap, new DefaultResourceCalculator());
+ RLESparseResourceAllocation rleVector =
+ new RLESparseResourceAllocation(allocationsMap,
+ new DefaultResourceCalculator());
return rleVector;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[4/4] hadoop git commit: HDFS-12182. BlockManager.metaSave does not
distinguish between "under replicated" and "missing" blocks. Contributed by
Wellington Chevreuil.
Posted by we...@apache.org.
HDFS-12182. BlockManager.metaSave does not distinguish between "under replicated" and "missing" blocks. Contributed by Wellington Chevreuil.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a2f3e78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a2f3e78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a2f3e78
Branch: refs/heads/branch-2
Commit: 3a2f3e78fff443033078b17b18c936b91c5ec799
Parents: 92d9ad7
Author: Wei-Chiu Chuang <we...@apache.org>
Authored: Tue Sep 5 14:58:08 2017 -0700
Committer: Wei-Chiu Chuang <we...@apache.org>
Committed: Tue Sep 5 14:58:08 2017 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 29 +++++++++--
.../blockmanagement/TestBlockManager.java | 55 ++++++++++++++++++++
.../hdfs/server/namenode/TestMetaSave.java | 4 +-
3 files changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a2f3e78/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 9bfbca8..4f0ec43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -605,17 +605,36 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.fetchDatanodes(live, dead, false);
out.println("Live Datanodes: " + live.size());
out.println("Dead Datanodes: " + dead.size());
+
//
- // Dump contents of neededReplication
+ // Need to iterate over all queues from neededReplications
+ // except for the QUEUE_WITH_CORRUPT_BLOCKS)
//
synchronized (neededReplications) {
- out.println("Metasave: Blocks waiting for replication: " +
- neededReplications.size());
- for (Block block : neededReplications) {
+ out.println("Metasave: Blocks waiting for reconstruction: "
+ + neededReplications.getUnderReplicatedBlockCount());
+ for (int i = 0; i < neededReplications.LEVEL; i++) {
+ if (i != neededReplications.QUEUE_WITH_CORRUPT_BLOCKS) {
+ for (Iterator<BlockInfo> it = neededReplications.iterator(i);
+ it.hasNext();) {
+ Block block = it.next();
+ dumpBlockMeta(block, out);
+ }
+ }
+ }
+ //
+ // Now prints corrupt blocks separately
+ //
+ out.println("Metasave: Blocks currently missing: " +
+ neededReplications.getCorruptBlockSize());
+ for (Iterator<BlockInfo> it = neededReplications.
+ iterator(neededReplications.QUEUE_WITH_CORRUPT_BLOCKS);
+ it.hasNext();) {
+ Block block = it.next();
dumpBlockMeta(block, out);
}
}
-
+
// Dump any postponed over-replicated blocks
out.println("Mis-replicated blocks that have been postponed:");
for (Block block : postponedMisreplicatedBlocks) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a2f3e78/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index f4fe38c..ad84805 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -1283,4 +1283,59 @@ public class TestBlockManager {
isReplicaCorrupt(Mockito.any(BlockInfo.class),
Mockito.any(DatanodeDescriptor.class));
}
+
+ @Test
+ public void testMetaSaveMissingReplicas() throws Exception {
+ List<DatanodeStorageInfo> origStorages = getStorages(0, 1);
+ List<DatanodeDescriptor> origNodes = getNodes(origStorages);
+ BlockInfo block = makeBlockReplicasMissing(0, origNodes);
+ File file = new File("test.log");
+ PrintWriter out = new PrintWriter(file);
+ bm.metaSave(out);
+ out.flush();
+ FileInputStream fstream = new FileInputStream(file);
+ DataInputStream in = new DataInputStream(fstream);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ StringBuffer buffer = new StringBuffer();
+ String line;
+ try {
+ while ((line = reader.readLine()) != null) {
+ buffer.append(line);
+ }
+ String output = buffer.toString();
+ assertTrue("Metasave output should have reported missing blocks.",
+ output.contains("Metasave: Blocks currently missing: 1"));
+ assertTrue("There should be 0 blocks waiting for reconstruction",
+ output.contains("Metasave: Blocks waiting for reconstruction: 0"));
+ String blockNameGS = block.getBlockName() + "_" +
+ block.getGenerationStamp();
+ assertTrue("Block " + blockNameGS + " should be MISSING.",
+ output.contains(blockNameGS + " MISSING"));
+ } finally {
+ reader.close();
+ file.delete();
+ }
+ }
+
+ private BlockInfo makeBlockReplicasMissing(long blockId,
+ List<DatanodeDescriptor> nodesList) throws IOException {
+ long inodeId = ++mockINodeId;
+ final INodeFile bc = TestINodeFile.createINodeFile(inodeId);
+
+ BlockInfo blockInfo = blockOnNodes(blockId, nodesList);
+ blockInfo.setReplication((short) 3);
+ blockInfo.setBlockCollectionId(inodeId);
+
+ Mockito.doReturn(bc).when(fsn).getBlockCollection(inodeId);
+ bm.blocksMap.addBlockCollection(blockInfo, bc);
+ bm.markBlockReplicasAsCorrupt(blockInfo,
+ blockInfo.getGenerationStamp() + 1,
+ blockInfo.getNumBytes(),
+ new DatanodeStorageInfo[]{});
+ BlockCollection mockedBc = Mockito.mock(BlockCollection.class);
+ Mockito.when(mockedBc.getBlocks()).thenReturn(new BlockInfo[]{blockInfo});
+ bm.checkReplication(mockedBc);
+ return blockInfo;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a2f3e78/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
index 14f9382..22691c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
@@ -162,7 +162,9 @@ public class TestMetaSave {
line = reader.readLine();
assertTrue(line.equals("Dead Datanodes: 1"));
line = reader.readLine();
- assertTrue(line.equals("Metasave: Blocks waiting for replication: 0"));
+ assertTrue(line.equals("Metasave: Blocks waiting for reconstruction: 0"));
+ line = reader.readLine();
+ assertTrue(line.equals("Metasave: Blocks currently missing: 0"));
line = reader.readLine();
assertTrue(line.equals("Mis-replicated blocks that have been postponed:"));
line = reader.readLine();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/4] hadoop git commit: Revert "Plan/ResourceAllocation data
structure enhancements required to support recurring reservations in
ReservationSystem."
Posted by we...@apache.org.
Revert "Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem."
This reverts commit 80516b3de79e9aac4dba1374638257b4611f199e.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92d9ad77
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92d9ad77
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92d9ad77
Branch: refs/heads/branch-2
Commit: 92d9ad77612c185c5b8ccabc59fdf2118e07f025
Parents: f382f66
Author: Wei-Chiu Chuang <we...@apache.org>
Authored: Tue Sep 5 14:52:43 2017 -0700
Committer: Wei-Chiu Chuang <we...@apache.org>
Committed: Tue Sep 5 14:52:43 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 6 -
.../yarn/conf/TestYarnConfigurationFields.java | 4 +-
.../reservation/AbstractReservationSystem.java | 90 ++--
.../AbstractSchedulerPlanFollower.java | 183 ++++----
.../reservation/InMemoryPlan.java | 400 +++++------------
.../InMemoryReservationAllocation.java | 36 +-
.../reservation/NoOverCommitPolicy.java | 2 +-
.../PeriodicRLESparseResourceAllocation.java | 130 ++----
.../resourcemanager/reservation/PlanEdit.java | 24 +-
.../resourcemanager/reservation/PlanView.java | 94 ++--
.../RLESparseResourceAllocation.java | 115 +++--
.../reservation/ReservationAllocation.java | 60 +--
.../reservation/ReservationInputValidator.java | 134 +++---
.../reservation/ReservationSystem.java | 6 +-
.../reservation/SharingPolicy.java | 13 +-
.../reservation/planning/Planner.java | 2 +-
.../reservation/planning/PlanningAlgorithm.java | 2 +-
.../reservation/planning/StageAllocator.java | 2 +-
.../planning/StageAllocatorGreedy.java | 2 +-
.../planning/StageAllocatorGreedyRLE.java | 5 +-
.../planning/StageAllocatorLowCostAligned.java | 4 +-
.../reservation/ReservationSystemTestUtil.java | 135 +++---
.../reservation/TestInMemoryPlan.java | 431 +++++++------------
...TestPeriodicRLESparseResourceAllocation.java | 109 ++---
.../TestRLESparseResourceAllocation.java | 122 +++---
.../planning/TestSimpleCapacityReplanner.java | 8 +-
26 files changed, 777 insertions(+), 1342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fb9f499..7b397f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -254,12 +254,6 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
1000L;
- /** The maximum periodicity for the Reservation System. */
- public static final String RM_RESERVATION_SYSTEM_MAX_PERIODICITY =
- RM_PREFIX + "reservation-system.max-periodicity";
- public static final long DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY =
- 86400000L;
-
/**
* Enable periodic monitor threads.
* @see #RM_SCHEDULER_MONITOR_POLICIES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 8d2411d..ee97fb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.TestConfigurationFieldsBase;
*/
public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
- @SuppressWarnings({"deprecation", "methodlength"})
+ @SuppressWarnings("deprecation")
@Override
public void initializeMemberVariables() {
xmlFilename = new String("yarn-default.xml");
@@ -67,8 +67,6 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration
.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
- configurationPropsToSkipCompare
- .add(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 5b8772c..5ef4912 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -18,17 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -57,6 +46,17 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* This is the implementation of {@link ReservationSystem} based on the
* {@link ResourceScheduler}
@@ -66,8 +66,8 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractReservationSystem extends AbstractService
implements ReservationSystem {
- private static final Logger LOG =
- LoggerFactory.getLogger(AbstractReservationSystem.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AbstractReservationSystem.class);
// private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
@@ -103,8 +103,6 @@ public abstract class AbstractReservationSystem extends AbstractService
private boolean isRecoveryEnabled = false;
- private long maxPeriodicity;
-
/**
* Construct the service.
*
@@ -145,41 +143,36 @@ public abstract class AbstractReservationSystem extends AbstractService
this.conf = conf;
scheduler = rmContext.getScheduler();
// Get the plan step size
- planStepSize = conf.getTimeDuration(
- YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
- TimeUnit.MILLISECONDS);
+ planStepSize =
+ conf.getTimeDuration(
+ YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+ TimeUnit.MILLISECONDS);
if (planStepSize < 0) {
planStepSize =
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
}
- maxPeriodicity =
- conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
- if (maxPeriodicity <= 0) {
- maxPeriodicity =
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY;
- }
// Create a plan corresponding to every reservable queue
Set<String> planQueueNames = scheduler.getPlanQueues();
for (String planQueueName : planQueueNames) {
Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan);
}
- isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
+ isRecoveryEnabled = conf.getBoolean(
+ YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
- YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE)
- && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
- YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
+ YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
+ conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
+ YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
reservationsACLsManager = new ReservationsACLsManager(scheduler, conf);
}
}
private void loadPlan(String planName,
Map<ReservationId, ReservationAllocationStateProto> reservations)
- throws PlanningException {
+ throws PlanningException {
Plan plan = plans.get(planName);
Resource minAllocation = getMinAllocation();
ResourceCalculator rescCalculator = getResourceCalculator();
@@ -255,8 +248,8 @@ public abstract class AbstractReservationSystem extends AbstractService
Class<?> planFollowerPolicyClazz =
conf.getClassByName(planFollowerPolicyClassName);
if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
- return (PlanFollower) ReflectionUtils
- .newInstance(planFollowerPolicyClazz, conf);
+ return (PlanFollower) ReflectionUtils.newInstance(
+ planFollowerPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+ " not instance of " + PlanFollower.class.getCanonicalName());
@@ -264,8 +257,7 @@ public abstract class AbstractReservationSystem extends AbstractService
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate PlanFollowerPolicy: "
- + planFollowerPolicyClassName,
- e);
+ + planFollowerPolicyClassName, e);
}
}
@@ -379,8 +371,9 @@ public abstract class AbstractReservationSystem extends AbstractService
public ReservationId getNewReservationId() {
writeLock.lock();
try {
- ReservationId resId = ReservationId.newInstance(
- ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet());
+ ReservationId resId =
+ ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
+ resCounter.incrementAndGet());
LOG.info("Allocated new reservationId: " + resId);
return resId;
} finally {
@@ -397,11 +390,8 @@ public abstract class AbstractReservationSystem extends AbstractService
* Get the default reservation system corresponding to the scheduler
*
* @param scheduler the scheduler for which the reservation system is required
- *
- * @return the {@link ReservationSystem} based on the configured scheduler
*/
- public static String getDefaultReservationSystem(
- ResourceScheduler scheduler) {
+ public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
if (scheduler instanceof CapacityScheduler) {
return CapacityReservationSystem.class.getName();
} else if (scheduler instanceof FairScheduler) {
@@ -419,11 +409,12 @@ public abstract class AbstractReservationSystem extends AbstractService
Resource maxAllocation = getMaxAllocation();
ResourceCalculator rescCalc = getResourceCalculator();
Resource totCap = getPlanQueueCapacity(planQueueName);
- Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
- getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
- maxAllocation, planQueueName, getReplanner(planQueuePath),
- getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath),
- maxPeriodicity, rmContext);
+ Plan plan =
+ new InMemoryPlan(getRootQueueMetrics(), adPolicy,
+ getAgent(planQueuePath), totCap, planStepSize, rescCalc,
+ minAllocation, maxAllocation, planQueueName,
+ getReplanner(planQueuePath), getReservationSchedulerConfiguration()
+ .getMoveOnExpiry(planQueuePath), rmContext);
LOG.info("Initialized plan {} based on reservable queue {}",
plan.toString(), planQueueName);
return plan;
@@ -486,8 +477,8 @@ public abstract class AbstractReservationSystem extends AbstractService
Class<?> admissionPolicyClazz =
conf.getClassByName(admissionPolicyClassName);
if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
- return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz,
- conf);
+ return (SharingPolicy) ReflectionUtils.newInstance(
+ admissionPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+ " not instance of " + SharingPolicy.class.getCanonicalName());
@@ -502,7 +493,8 @@ public abstract class AbstractReservationSystem extends AbstractService
return this.reservationsACLsManager;
}
- protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration();
+ protected abstract ReservationSchedulerConfiguration
+ getReservationSchedulerConfiguration();
protected abstract String getPlanQueuePath(String planQueueName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
index 9b6a0b0..90357e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
@@ -18,14 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -41,17 +33,24 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
- private static final Logger LOG =
- LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AbstractSchedulerPlanFollower.class);
protected Collection<Plan> plans = new ArrayList<Plan>();
protected YarnScheduler scheduler;
protected Clock clock;
@Override
- public void init(Clock clock, ResourceScheduler sched,
- Collection<Plan> plans) {
+ public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
this.clock = clock;
this.scheduler = sched;
this.plans.addAll(plans);
@@ -72,7 +71,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
@Override
public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
- String planQueueName = plan.getQueueName();
+ String planQueueName = plan.getQueueName();
if (LOG.isDebugEnabled()) {
LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
}
@@ -83,14 +82,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
now += step - (now % step);
}
Queue planQueue = getPlanQueue(planQueueName);
- if (planQueue == null) {
- return;
- }
+ if (planQueue == null) return;
// first we publish to the plan the current availability of resources
Resource clusterResources = scheduler.getClusterResource();
- Resource planResources =
- getPlanResources(plan, planQueue, clusterResources);
+ Resource planResources = getPlanResources(plan, planQueue,
+ clusterResources);
Set<ReservationAllocation> currentReservations =
plan.getReservationsAtTime(now);
Set<String> curReservationNames = new HashSet<String>();
@@ -98,11 +95,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
int numRes = getReservedResources(now, currentReservations,
curReservationNames, reservedResources);
// create the default reservation queue if it doesnt exist
- String defReservationId = getReservationIdFromQueueName(planQueueName)
- + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
- String defReservationQueue =
- getReservationQueueName(planQueueName, defReservationId);
- createDefaultReservationQueue(planQueueName, planQueue, defReservationId);
+ String defReservationId = getReservationIdFromQueueName(planQueueName) +
+ ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+ String defReservationQueue = getReservationQueueName(planQueueName,
+ defReservationId);
+ createDefaultReservationQueue(planQueueName, planQueue,
+ defReservationId);
curReservationNames.add(defReservationId);
// if the resources dedicated to this plan has shrunk invoke replanner
boolean shouldResize = false;
@@ -151,8 +149,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
// sort allocations from the one giving up the most resources, to the
// one asking for the most avoid order-of-operation errors that
// temporarily violate 100% capacity bound
- List<ReservationAllocation> sortedAllocations = sortByDelta(
- new ArrayList<ReservationAllocation>(currentReservations), now, plan);
+ List<ReservationAllocation> sortedAllocations =
+ sortByDelta(
+ new ArrayList<ReservationAllocation>(currentReservations), now,
+ plan);
for (ReservationAllocation res : sortedAllocations) {
String currResId = res.getReservationId().toString();
if (curReservationNames.contains(currResId)) {
@@ -163,9 +163,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
if (planResources.getMemorySize() > 0
&& planResources.getVirtualCores() > 0) {
if (shouldResize) {
- capToAssign = calculateReservationToPlanProportion(
- plan.getResourceCalculator(), planResources, reservedResources,
- capToAssign);
+ capToAssign =
+ calculateReservationToPlanProportion(
+ plan.getResourceCalculator(), planResources,
+ reservedResources, capToAssign);
}
targetCapacity =
calculateReservationToPlanRatio(plan.getResourceCalculator(),
@@ -184,8 +185,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
maxCapacity = targetCapacity;
}
try {
- setQueueEntitlement(planQueueName, currResId, targetCapacity,
- maxCapacity);
+ setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity);
} catch (YarnException e) {
LOG.warn("Exception while trying to size reservation for plan: {}",
currResId, planQueueName, e);
@@ -196,10 +196,9 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
// compute the default queue capacity
float defQCap = 1.0f - totalAssignedCapacity;
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "PlanFollowerEditPolicyTask: total Plan Capacity: {} "
- + "currReservation: {} default-queue capacity: {}",
- planResources, numRes, defQCap);
+ LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+ + "currReservation: {} default-queue capacity: {}", planResources,
+ numRes, defQCap);
}
// set the default queue to eat-up all remaining capacity
try {
@@ -226,11 +225,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
protected void setQueueEntitlement(String planQueueName, String currResId,
- float targetCapacity, float maxCapacity) throws YarnException {
- String reservationQueueName =
- getReservationQueueName(planQueueName, currResId);
- scheduler.setEntitlement(reservationQueueName,
- new QueueEntitlement(targetCapacity, maxCapacity));
+ float targetCapacity,
+ float maxCapacity) throws YarnException {
+ String reservationQueueName = getReservationQueueName(planQueueName,
+ currResId);
+ scheduler.setEntitlement(reservationQueueName, new QueueEntitlement(
+ targetCapacity, maxCapacity));
}
// Schedulers have different ways of naming queues. See YARN-2773
@@ -244,21 +244,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
* Then move all apps in the set of queues to the parent plan queue's default
* reservation queue if move is enabled. Finally cleanups the queue by killing
* any apps (if move is disabled or move failed) and removing the queue
- *
- * @param planQueueName the name of {@code PlanQueue}
- * @param shouldMove flag to indicate if any running apps should be moved or
- * killed
- * @param toRemove the remnant apps to clean up
- * @param defReservationQueue the default {@code ReservationQueue} of the
- * {@link Plan}
*/
- protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove,
- Set<String> toRemove, String defReservationQueue) {
+ protected void cleanupExpiredQueues(String planQueueName,
+ boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
for (String expiredReservationId : toRemove) {
try {
// reduce entitlement to 0
- String expiredReservation =
- getReservationQueueName(planQueueName, expiredReservationId);
+ String expiredReservation = getReservationQueueName(planQueueName,
+ expiredReservationId);
setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
if (shouldMove) {
moveAppsInQueueSync(expiredReservation, defReservationQueue);
@@ -282,7 +275,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
* reservation queue in a synchronous fashion
*/
private void moveAppsInQueueSync(String expiredReservation,
- String defReservationQueue) {
+ String defReservationQueue) {
List<ApplicationAttemptId> activeApps =
scheduler.getAppsInQueue(expiredReservation);
if (activeApps.isEmpty()) {
@@ -294,16 +287,16 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
} catch (YarnException e) {
LOG.warn(
- "Encountered unexpected error during migration of application: {}"
- + " from reservation: {}",
+ "Encountered unexpected error during migration of application: {}" +
+ " from reservation: {}",
app, expiredReservation, e);
}
}
}
- protected int getReservedResources(long now,
- Set<ReservationAllocation> currentReservations,
- Set<String> curReservationNames, Resource reservedResources) {
+ protected int getReservedResources(long now, Set<ReservationAllocation>
+ currentReservations, Set<String> curReservationNames,
+ Resource reservedResources) {
int numRes = 0;
if (currentReservations != null) {
numRes = currentReservations.size();
@@ -319,30 +312,23 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
* Sort in the order from the least new amount of resources asked (likely
* negative) to the highest. This prevents "order-of-operation" errors related
* to exceeding 100% capacity temporarily.
- *
- * @param currentReservations the currently active reservations
- * @param now the current time
- * @param plan the {@link Plan} that is being considered
- *
- * @return the sorted list of {@link ReservationAllocation}s
*/
protected List<ReservationAllocation> sortByDelta(
List<ReservationAllocation> currentReservations, long now, Plan plan) {
- Collections.sort(currentReservations,
- new ReservationAllocationComparator(now, this, plan));
+ Collections.sort(currentReservations, new ReservationAllocationComparator(
+ now, this, plan));
return currentReservations;
}
/**
- * Get queue associated with reservable queue named.
- *
- * @param planQueueName name of the reservable queue
+ * Get queue associated with reservable queue named
+ * @param planQueueName Name of the reservable queue
* @return queue associated with the reservable queue
*/
protected abstract Queue getPlanQueue(String planQueueName);
/**
- * Resizes reservations based on currently available resources.
+ * Resizes reservations based on currently available resources
*/
private Resource calculateReservationToPlanProportion(
ResourceCalculator rescCalculator, Resource availablePlanResources,
@@ -352,7 +338,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
/**
- * Calculates ratio of reservationResources to planResources.
+ * Calculates ratio of reservationResources to planResources
*/
private float calculateReservationToPlanRatio(
ResourceCalculator rescCalculator, Resource clusterResources,
@@ -362,7 +348,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
/**
- * Check if plan resources are less than expected reservation resources.
+ * Check if plan resources are less than expected reservation resources
*/
private boolean arePlanResourcesLessThanReservations(
ResourceCalculator rescCalculator, Resource clusterResources,
@@ -372,56 +358,38 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
/**
- * Get a list of reservation queues for this planQueue.
- *
- * @param planQueue the queue for the current {@link Plan}
- *
- * @return the queues corresponding to the reservations
+ * Get a list of reservation queues for this planQueue
*/
protected abstract List<? extends Queue> getChildReservationQueues(
Queue planQueue);
/**
- * Add a new reservation queue for reservation currResId for this planQueue.
+ * Add a new reservation queue for reservation currResId for this planQueue
*/
- protected abstract void addReservationQueue(String planQueueName, Queue queue,
- String currResId);
+ protected abstract void addReservationQueue(
+ String planQueueName, Queue queue, String currResId);
/**
- * Creates the default reservation queue for use when no reservation is used
- * for applications submitted to this planQueue.
- *
- * @param planQueueName name of the reservable queue
- * @param queue the queue for the current {@link Plan}
- * @param defReservationQueue name of the default {@code ReservationQueue}
+ * Creates the default reservation queue for use when no reservation is
+ * used for applications submitted to this planQueue
*/
- protected abstract void createDefaultReservationQueue(String planQueueName,
- Queue queue, String defReservationQueue);
+ protected abstract void createDefaultReservationQueue(
+ String planQueueName, Queue queue, String defReservationQueue);
/**
- * Get plan resources for this planQueue.
- *
- * @param plan the current {@link Plan} being considered
- * @param clusterResources the resources available in the cluster
- *
- * @return the resources allocated to the specified {@link Plan}
+ * Get plan resources for this planQueue
*/
- protected abstract Resource getPlanResources(Plan plan, Queue queue,
- Resource clusterResources);
+ protected abstract Resource getPlanResources(
+ Plan plan, Queue queue, Resource clusterResources);
/**
* Get reservation queue resources if it exists otherwise return null.
- *
- * @param plan the current {@link Plan} being considered
- * @param reservationId the identifier of the reservation
- *
- * @return the resources allocated to the specified reservation
*/
protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
ReservationId reservationId);
- private static class ReservationAllocationComparator
- implements Comparator<ReservationAllocation> {
+ private static class ReservationAllocationComparator implements
+ Comparator<ReservationAllocation> {
AbstractSchedulerPlanFollower planFollower;
long now;
Plan plan;
@@ -436,12 +404,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
private Resource getUnallocatedReservedResources(
ReservationAllocation reservation) {
Resource resResource;
- Resource reservationResource =
- planFollower.getReservationQueueResourceIfExists(plan,
- reservation.getReservationId());
+ Resource reservationResource = planFollower
+ .getReservationQueueResourceIfExists
+ (plan, reservation.getReservationId());
if (reservationResource != null) {
- resResource = Resources.subtract(reservation.getResourcesAtTime(now),
- reservationResource);
+ resResource =
+ Resources.subtract(
+ reservation.getResourcesAtTime(now),
+ reservationResource);
} else {
resResource = reservation.getResourcesAtTime(now);
}
@@ -458,3 +428,4 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
}
}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 9eb1820..783fd09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,10 +33,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -65,14 +64,9 @@ public class InMemoryPlan implements Plan {
private RLESparseResourceAllocation rleSparseVector;
- private PeriodicRLESparseResourceAllocation periodicRle;
-
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
- private Map<String, RLESparseResourceAllocation> userPeriodicResourceAlloc =
- new HashMap<String, RLESparseResourceAllocation>();
-
private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
new HashMap<String, RLESparseResourceAllocation>();
@@ -102,27 +96,15 @@ public class InMemoryPlan implements Plan {
String queueName, Planner replanner, boolean getMoveOnExpiry,
RMContext rmContext) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
- maxAlloc, queueName, replanner, getMoveOnExpiry,
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
- rmContext);
+ maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext,
+ new UTCClock());
}
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry,
- long maxPeriodicty, RMContext rmContext) {
- this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
- maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicty,
- rmContext, new UTCClock());
- }
-
- @SuppressWarnings("checkstyle:parameternumber")
- public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
- ReservationAgent agent, Resource totalCapacity, long step,
- ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
- String queueName, Planner replanner, boolean getMoveOnExpiry,
- long maxPeriodicty, RMContext rmContext, Clock clock) {
+ RMContext rmContext, Clock clock) {
this.queueMetrics = queueMetrics;
this.policy = policy;
this.agent = agent;
@@ -132,8 +114,6 @@ public class InMemoryPlan implements Plan {
this.minAlloc = minAlloc;
this.maxAlloc = maxAlloc;
this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
- this.periodicRle =
- new PeriodicRLESparseResourceAllocation(resCalc, maxPeriodicty);
this.queueName = queueName;
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
@@ -146,39 +126,6 @@ public class InMemoryPlan implements Plan {
return queueMetrics;
}
- private RLESparseResourceAllocation getUserRLEResourceAllocation(String user,
- long period) {
- RLESparseResourceAllocation resAlloc = null;
- if (period > 0) {
- if (userPeriodicResourceAlloc.containsKey(user)) {
- resAlloc = userPeriodicResourceAlloc.get(user);
- } else {
- resAlloc = new PeriodicRLESparseResourceAllocation(resCalc,
- periodicRle.getTimePeriod());
- userPeriodicResourceAlloc.put(user, resAlloc);
- }
- } else {
- if (userResourceAlloc.containsKey(user)) {
- resAlloc = userResourceAlloc.get(user);
- } else {
- resAlloc = new RLESparseResourceAllocation(resCalc);
- userResourceAlloc.put(user, resAlloc);
- }
- }
- return resAlloc;
- }
-
- private void gcUserRLEResourceAllocation(String user, long period) {
- if (period > 0) {
- if (userPeriodicResourceAlloc.get(user).isEmpty()) {
- userPeriodicResourceAlloc.remove(user);
- }
- } else {
- if (userResourceAlloc.get(user).isEmpty()) {
- userResourceAlloc.remove(user);
- }
- }
- }
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
@@ -186,10 +133,11 @@ public class InMemoryPlan implements Plan {
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
- long period = reservation.getPeriodicity();
- RLESparseResourceAllocation resAlloc =
- getUserRLEResourceAllocation(user, period);
-
+ RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+ if (resAlloc == null) {
+ resAlloc = new RLESparseResourceAllocation(resCalc);
+ userResourceAlloc.put(user, resAlloc);
+ }
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
if (resCount == null) {
resCount = new RLESparseResourceAllocation(resCalc);
@@ -201,43 +149,14 @@ public class InMemoryPlan implements Plan {
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
-
- if (period > 0L) {
- for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
-
- long rStart = r.getKey().getStartTime() + i * period;
- long rEnd = r.getKey().getEndTime() + i * period;
-
- // handle wrap-around
- if (rEnd > periodicRle.getTimePeriod()) {
- long diff = rEnd - periodicRle.getTimePeriod();
- rEnd = periodicRle.getTimePeriod();
- ReservationInterval newInterval = new ReservationInterval(0, diff);
- periodicRle.addInterval(newInterval, r.getValue());
- resAlloc.addInterval(newInterval, r.getValue());
- }
-
- ReservationInterval newInterval =
- new ReservationInterval(rStart, rEnd);
- periodicRle.addInterval(newInterval, r.getValue());
- resAlloc.addInterval(newInterval, r.getValue());
- }
-
- } else {
- rleSparseVector.addInterval(r.getKey(), r.getValue());
- resAlloc.addInterval(r.getKey(), r.getValue());
- if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
- ZERO_RESOURCE)) {
- earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
- latestActive = Math.max(latestActive, r.getKey().getEndTime());
- }
+ resAlloc.addInterval(r.getKey(), r.getValue());
+ rleSparseVector.addInterval(r.getKey(), r.getValue());
+ if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+ ZERO_RESOURCE)) {
+ earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+ latestActive = Math.max(latestActive, r.getKey().getEndTime());
}
}
- // periodic reservations are active from start time and good till cancelled
- if (period > 0L) {
- earliestActive = reservation.getStartTime();
- latestActive = Long.MAX_VALUE;
- }
resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
Resource.newInstance(1, 1));
}
@@ -247,55 +166,27 @@ public class InMemoryPlan implements Plan {
Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
- long period = reservation.getPeriodicity();
- RLESparseResourceAllocation resAlloc =
- getUserRLEResourceAllocation(user, period);
+ RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
long earliestActive = Long.MAX_VALUE;
long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
- if (period > 0L) {
- for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
-
- long rStart = r.getKey().getStartTime() + i * period;
- long rEnd = r.getKey().getEndTime() + i * period;
-
- // handle wrap-around
- if (rEnd > periodicRle.getTimePeriod()) {
- long diff = rEnd - periodicRle.getTimePeriod();
- rEnd = periodicRle.getTimePeriod();
- ReservationInterval newInterval = new ReservationInterval(0, diff);
- periodicRle.removeInterval(newInterval, r.getValue());
- resAlloc.removeInterval(newInterval, r.getValue());
- }
-
- ReservationInterval newInterval =
- new ReservationInterval(rStart, rEnd);
- periodicRle.removeInterval(newInterval, r.getValue());
- resAlloc.removeInterval(newInterval, r.getValue());
- }
- } else {
- rleSparseVector.removeInterval(r.getKey(), r.getValue());
- resAlloc.removeInterval(r.getKey(), r.getValue());
- if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
- ZERO_RESOURCE)) {
- earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
- latestActive = Math.max(latestActive, r.getKey().getEndTime());
- }
+ resAlloc.removeInterval(r.getKey(), r.getValue());
+ rleSparseVector.removeInterval(r.getKey(), r.getValue());
+ if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+ ZERO_RESOURCE)) {
+ earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+ latestActive = Math.max(latestActive, r.getKey().getEndTime());
}
}
- gcUserRLEResourceAllocation(user, period);
+ if (resAlloc.isEmpty()) {
+ userResourceAlloc.remove(user);
+ }
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
- // periodic reservations are active from start time and good till cancelled
- if (period > 0L) {
- earliestActive = reservation.getStartTime();
- latestActive = Long.MAX_VALUE;
- }
- resCount.removeInterval(
- new ReservationInterval(earliestActive, latestActive),
- Resource.newInstance(1, 1));
+ resCount.removeInterval(new ReservationInterval(earliestActive,
+ latestActive), Resource.newInstance(1, 1));
if (resCount.isEmpty()) {
userActiveReservationCount.remove(user);
}
@@ -307,9 +198,9 @@ public class InMemoryPlan implements Plan {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
new TreeSet<ReservationAllocation>();
- for (Set<InMemoryReservationAllocation> res : currentReservations
- .values()) {
- flattenedReservations.addAll(res);
+ for (Set<InMemoryReservationAllocation> reservationEntries :
+ currentReservations.values()) {
+ flattenedReservations.addAll(reservationEntries);
}
return flattenedReservations;
} else {
@@ -327,16 +218,19 @@ public class InMemoryPlan implements Plan {
InMemoryReservationAllocation inMemReservation =
(InMemoryReservationAllocation) reservation;
if (inMemReservation.getUser() == null) {
- String errMsg = "The specified Reservation with ID "
- + inMemReservation.getReservationId() + " is not mapped to any user";
+ String errMsg =
+ "The specified Reservation with ID "
+ + inMemReservation.getReservationId()
+ + " is not mapped to any user";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
writeLock.lock();
try {
if (reservationTable.containsKey(inMemReservation.getReservationId())) {
- String errMsg = "The specified Reservation with ID "
- + inMemReservation.getReservationId() + " already exists";
+ String errMsg =
+ "The specified Reservation with ID "
+ + inMemReservation.getReservationId() + " already exists";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
@@ -352,8 +246,9 @@ public class InMemoryPlan implements Plan {
getQueueName(), inMemReservation.getReservationId().toString());
}
}
- ReservationInterval searchInterval = new ReservationInterval(
- inMemReservation.getStartTime(), inMemReservation.getEndTime());
+ ReservationInterval searchInterval =
+ new ReservationInterval(inMemReservation.getStartTime(),
+ inMemReservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations == null) {
@@ -385,8 +280,9 @@ public class InMemoryPlan implements Plan {
ReservationId resId = reservation.getReservationId();
ReservationAllocation currReservation = getReservationById(resId);
if (currReservation == null) {
- String errMsg = "The specified Reservation with ID " + resId
- + " does not exist in the plan";
+ String errMsg =
+ "The specified Reservation with ID " + resId
+ + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
@@ -422,8 +318,9 @@ public class InMemoryPlan implements Plan {
private boolean removeReservation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
- ReservationInterval searchInterval = new ReservationInterval(
- reservation.getStartTime(), reservation.getEndTime());
+ ReservationInterval searchInterval =
+ new ReservationInterval(reservation.getStartTime(),
+ reservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations != null) {
@@ -440,15 +337,16 @@ public class InMemoryPlan implements Plan {
currentReservations.remove(searchInterval);
}
} else {
- String errMsg = "The specified Reservation with ID "
- + reservation.getReservationId() + " does not exist in the plan";
+ String errMsg =
+ "The specified Reservation with ID " + reservation.getReservationId()
+ + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
- reservation.getReservationId());
+ reservation.getReservationId());
return true;
}
@@ -458,8 +356,9 @@ public class InMemoryPlan implements Plan {
try {
ReservationAllocation reservation = getReservationById(reservationID);
if (reservation == null) {
- String errMsg = "The specified Reservation with ID " + reservationID
- + " does not exist in the plan";
+ String errMsg =
+ "The specified Reservation with ID " + reservationID
+ + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
@@ -554,90 +453,66 @@ public class InMemoryPlan implements Plan {
long start, long end) {
readLock.lock();
try {
- // merge periodic and non-periodic allocations
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
- RLESparseResourceAllocation userPeriodicResAlloc =
- userPeriodicResourceAlloc.get(user);
- if (userResAlloc != null && userPeriodicResAlloc != null) {
- return RLESparseResourceAllocation.merge(resCalc, totalCapacity,
- userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end);
- }
if (userResAlloc != null) {
return userResAlloc.getRangeOverlapping(start, end);
+ } else {
+ return new RLESparseResourceAllocation(resCalc);
}
- if (userPeriodicResAlloc != null) {
- return userPeriodicResAlloc.getRangeOverlapping(start, end);
- }
- } catch (PlanningException e) {
- LOG.warn("Exception while trying to merge periodic"
- + " and non-periodic user allocations: {}", e.getMessage(), e);
} finally {
readLock.unlock();
}
- return new RLESparseResourceAllocation(resCalc);
}
@Override
public Resource getTotalCommittedResources(long t) {
readLock.lock();
try {
- return Resources.add(rleSparseVector.getCapacityAtTime(t),
- periodicRle.getCapacityAtTime(t));
+ return rleSparseVector.getCapacityAtTime(t);
} finally {
readLock.unlock();
}
}
@Override
- public Set<ReservationAllocation> getReservations(ReservationId reservationID,
- ReservationInterval interval) {
+ public Set<ReservationAllocation> getReservations(ReservationId
+ reservationID, ReservationInterval interval) {
return getReservations(reservationID, interval, null);
}
@Override
- public Set<ReservationAllocation> getReservations(ReservationId reservationID,
- ReservationInterval interval, String user) {
+ public Set<ReservationAllocation> getReservations(ReservationId
+ reservationID, ReservationInterval interval, String user) {
if (reservationID != null) {
ReservationAllocation allocation = getReservationById(reservationID);
- if (allocation == null) {
+ if (allocation == null){
return Collections.emptySet();
}
return Collections.singleton(allocation);
}
- long startTime = interval == null ? 0 : interval.getStartTime();
- long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime();
+ long startTime = interval == null? 0 : interval.getStartTime();
+ long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime();
ReservationInterval searchInterval =
- new ReservationInterval(endTime, Long.MAX_VALUE);
+ new ReservationInterval(endTime, Long.MAX_VALUE);
readLock.lock();
try {
- SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> res =
- currentReservations.headMap(searchInterval, true);
- if (!res.isEmpty()) {
- Set<ReservationAllocation> flattenedReservations = new HashSet<>();
- for (Set<InMemoryReservationAllocation> resEntries : res.values()) {
- for (InMemoryReservationAllocation reservation : resEntries) {
- // validate user
- if (user != null && !user.isEmpty()
- && !reservation.getUser().equals(user)) {
- continue;
- }
- // handle periodic reservations
- long period = reservation.getPeriodicity();
- if (period > 0) {
- long t = endTime % period;
- // check for both contained and wrap-around reservations
- if ((t - startTime) * (t - endTime)
- * (startTime - endTime) >= 0) {
- flattenedReservations.add(reservation);
- }
- } else {
- // check for non-periodic reservations
- if (reservation.getEndTime() > startTime) {
- flattenedReservations.add(reservation);
+ SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>>
+ reservations = currentReservations.headMap(searchInterval, true);
+ if (!reservations.isEmpty()) {
+ Set<ReservationAllocation> flattenedReservations =
+ new HashSet<>();
+ for (Set<InMemoryReservationAllocation> reservationEntries :
+ reservations.values()) {
+ for (InMemoryReservationAllocation res : reservationEntries) {
+ if (res.getEndTime() > startTime) {
+ if (user != null && !user.isEmpty()
+ && !res.getUser().equals(user)) {
+ continue;
}
+ flattenedReservations.add(res);
}
}
}
@@ -675,82 +550,36 @@ public class InMemoryPlan implements Plan {
@Override
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
- ReservationId oldId, long start, long end, long period)
- throws PlanningException {
+ ReservationId oldId, long start, long end) throws PlanningException {
readLock.lock();
try {
-
- // for non-periodic return simple available resources
- if (period == 0) {
-
- // create RLE of totCapacity
- TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
- totAvailable.put(start, Resources.clone(totalCapacity));
- RLESparseResourceAllocation totRLEAvail =
- new RLESparseResourceAllocation(totAvailable, resCalc);
-
- // subtract used from available
- RLESparseResourceAllocation netAvailable;
-
- netAvailable = RLESparseResourceAllocation.merge(resCalc,
- Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
- RLEOperator.subtractTestNonNegative, start, end);
-
- // remove periodic component
- netAvailable = RLESparseResourceAllocation.merge(resCalc,
- Resources.clone(totalCapacity), netAvailable, periodicRle,
- RLEOperator.subtractTestNonNegative, start, end);
-
- // add back in old reservation used resources if any
- ReservationAllocation old = reservationTable.get(oldId);
- if (old != null) {
-
- RLESparseResourceAllocation addBackPrevious =
- old.getResourcesOverTime(start, end);
- netAvailable = RLESparseResourceAllocation.merge(resCalc,
- Resources.clone(totalCapacity), netAvailable, addBackPrevious,
- RLEOperator.add, start, end);
- }
- // lower it if this is needed by the sharing policy
- netAvailable = getSharingPolicy().availableResources(netAvailable, this,
- user, oldId, start, end);
- return netAvailable;
- } else {
-
- if (periodicRle.getTimePeriod() % period != 0) {
- throw new PlanningException("The reservation periodicity (" + period
- + ") must be" + "an exact divider of the system maxPeriod ("
- + periodicRle.getTimePeriod() + ")");
- }
-
- // find the minimum resources available among all the instances that fit
- // in the LCM
- long numInstInLCM = periodicRle.getTimePeriod() / period;
-
- RLESparseResourceAllocation minOverLCM =
- getAvailableResourceOverTime(user, oldId, start, end, 0);
- for (int i = 1; i < numInstInLCM; i++) {
-
- long rStart = start + i * period;
- long rEnd = end + i * period;
-
- // recursive invocation of non-periodic range (to pick raw-info)
- RLESparseResourceAllocation snapShot =
- getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0);
-
- // time-align on start
- snapShot.shift(-(i * period));
-
- // pick the minimum amount of resources in each time interval
- minOverLCM =
- RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(),
- minOverLCM, snapShot, RLEOperator.min, start, end);
-
- }
-
- return minOverLCM;
-
+ // create RLE of totCapacity
+ TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
+ totAvailable.put(start, Resources.clone(totalCapacity));
+ RLESparseResourceAllocation totRLEAvail =
+ new RLESparseResourceAllocation(totAvailable, resCalc);
+
+ // subtract used from available
+ RLESparseResourceAllocation netAvailable;
+
+ netAvailable =
+ RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
+ RLEOperator.subtractTestNonNegative, start, end);
+
+ // add back in old reservation used resources if any
+ ReservationAllocation old = reservationTable.get(oldId);
+ if (old != null) {
+ netAvailable =
+ RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), netAvailable,
+ old.getResourcesOverTime(), RLEOperator.add, start, end);
}
+ // lower it if this is needed by the sharing policy
+ netAvailable =
+ getSharingPolicy().availableResources(netAvailable, this, user,
+ oldId, start, end);
+ return netAvailable;
} finally {
readLock.unlock();
}
@@ -808,7 +637,7 @@ public class InMemoryPlan implements Plan {
public String toCumulativeString() {
readLock.lock();
try {
- return rleSparseVector.toString() + "\n" + periodicRle.toString();
+ return rleSparseVector.toString();
} finally {
readLock.unlock();
}
@@ -860,18 +689,11 @@ public class InMemoryPlan implements Plan {
}
@Override
- public RLESparseResourceAllocation getCumulativeLoadOverTime(long start,
- long end) throws PlanningException {
+ public RLESparseResourceAllocation getCumulativeLoadOverTime(
+ long start, long end) {
readLock.lock();
try {
-
- RLESparseResourceAllocation ret =
- rleSparseVector.getRangeOverlapping(start, end);
- ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret,
- periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start,
- end);
-
- return ret;
+ return rleSparseVector.getRangeOverlapping(start, end);
} finally {
readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index 00c8e44..69fd43f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -42,7 +42,6 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
private final Map<ReservationInterval, Resource> allocationRequests;
private boolean hasGang = false;
private long acceptedAt = -1;
- private long periodicity = 0;
private RLESparseResourceAllocation resourcesOverTime;
@@ -68,16 +67,9 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
this.allocationRequests = allocations;
this.planName = planName;
this.hasGang = hasGang;
- if (contract != null && contract.getRecurrenceExpression() != null) {
- this.periodicity = Long.parseLong(contract.getRecurrenceExpression());
- }
- if (periodicity > 0) {
- resourcesOverTime =
- new PeriodicRLESparseResourceAllocation(calculator, periodicity);
- } else {
- resourcesOverTime = new RLESparseResourceAllocation(calculator);
- }
- for (Map.Entry<ReservationInterval, Resource> r : allocations.entrySet()) {
+ resourcesOverTime = new RLESparseResourceAllocation(calculator);
+ for (Map.Entry<ReservationInterval, Resource> r : allocations
+ .entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue());
}
}
@@ -141,33 +133,17 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
}
@Override
- public RLESparseResourceAllocation getResourcesOverTime() {
+ public RLESparseResourceAllocation getResourcesOverTime(){
return resourcesOverTime;
}
@Override
- public RLESparseResourceAllocation getResourcesOverTime(long start,
- long end) {
- return resourcesOverTime.getRangeOverlapping(start, end);
- }
-
- @Override
- public long getPeriodicity() {
- return periodicity;
- }
-
- @Override
- public void setPeriodicity(long period) {
- periodicity = period;
- }
-
- @Override
public String toString() {
StringBuilder sBuf = new StringBuilder();
sBuf.append(getReservationId()).append(" user:").append(getUser())
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
- .append(getEndTime()).append(" Periodiciy: ").append(periodicity)
- .append(" alloc:\n[").append(resourcesOverTime.toString()).append("] ");
+ .append(getEndTime()).append(" alloc:\n[")
+ .append(resourcesOverTime.toString()).append("] ");
return sBuf.toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index 49d4702..55f1d00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -40,7 +40,7 @@ public class NoOverCommitPolicy implements SharingPolicy {
RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
reservation.getUser(), reservation.getReservationId(),
- reservation.getStartTime(), reservation.getEndTime(), 0);
+ reservation.getStartTime(), reservation.getEndTime());
// test the reservation does not exceed what is available
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
index 7bc44f5..8e3be8b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
@@ -18,94 +18,47 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
/**
- * This data structure stores a periodic {@link RLESparseResourceAllocation}.
+ * This data structure stores a periodic RLESparseResourceAllocation.
* Default period is 1 day (86400000ms).
*/
-public class PeriodicRLESparseResourceAllocation
- extends RLESparseResourceAllocation {
+public class PeriodicRLESparseResourceAllocation extends
+ RLESparseResourceAllocation {
// Log
- private static final Logger LOG =
- LoggerFactory.getLogger(PeriodicRLESparseResourceAllocation.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(PeriodicRLESparseResourceAllocation.class);
private long timePeriod;
/**
* Constructor.
*
- * @param resourceCalculator {@link ResourceCalculator} the resource
- * calculator to use.
+ * @param rleVector {@link RLESparseResourceAllocation} with the run-length
+ encoded data.
* @param timePeriod Time period in milliseconds.
*/
public PeriodicRLESparseResourceAllocation(
- ResourceCalculator resourceCalculator, Long timePeriod) {
- super(resourceCalculator);
+ RLESparseResourceAllocation rleVector, Long timePeriod) {
+ super(rleVector.getCumulative(), rleVector.getResourceCalculator());
this.timePeriod = timePeriod;
}
/**
* Constructor. Default time period set to 1 day.
*
- * @param resourceCalculator {@link ResourceCalculator} the resource
- * calculator to use..
- */
- public PeriodicRLESparseResourceAllocation(
- ResourceCalculator resourceCalculator) {
- this(resourceCalculator,
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
- }
-
- /**
- * Constructor.
- *
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
- * encoded data.
- * @param timePeriod Time period in milliseconds.
+ encoded data.
*/
- @VisibleForTesting
public PeriodicRLESparseResourceAllocation(
- RLESparseResourceAllocation rleVector, Long timePeriod) {
- super(rleVector.getCumulative(), rleVector.getResourceCalculator());
- this.timePeriod = timePeriod;
-
- // make sure the PeriodicRLE is zero-based, and handles wrap-around
- long delta = (getEarliestStartTime() % timePeriod - getEarliestStartTime());
- shift(delta);
-
- List<Long> toRemove = new ArrayList<>();
- Map<Long, Resource> toAdd = new TreeMap<>();
-
- for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
- if (entry.getKey() > timePeriod) {
- toRemove.add(entry.getKey());
- if (entry.getValue() != null) {
- toAdd.put(timePeriod, entry.getValue());
- long prev = entry.getKey() % timePeriod;
- toAdd.put(prev, this.getCapacityAtTime(prev));
- toAdd.put(0L, entry.getValue());
- }
- }
- }
- for (Long l : toRemove) {
- cumulativeCapacity.remove(l);
- }
- cumulativeCapacity.putAll(toAdd);
+ RLESparseResourceAllocation rleVector) {
+ this(rleVector, 86400000L);
}
/**
@@ -125,25 +78,24 @@ public class PeriodicRLESparseResourceAllocation
* The interval may include 0, but the end time must be strictly less than
* timePeriod.
*
- * @param interval {@link ReservationInterval} to which the specified resource
- * is to be added.
+ * @param interval {@link ReservationInterval} to which the specified
+ * resource is to be added.
* @param resource {@link Resource} to be added to the interval specified.
* @return true if addition is successful, false otherwise
*/
- public boolean addInterval(ReservationInterval interval, Resource resource) {
+ public boolean addInterval(ReservationInterval interval,
+ Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
-
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
return super.addInterval(interval, resource);
} else {
- LOG.info("Cannot set capacity beyond end time: " + timePeriod + " was ("
- + interval.toString() + ")");
+ LOG.info("Cannot set capacity beyond end time: " + timePeriod);
return false;
}
}
- /**
+ /**
* Removes a resource for the specified interval.
*
* @param interval the {@link ReservationInterval} for which the resource is
@@ -151,15 +103,14 @@ public class PeriodicRLESparseResourceAllocation
* @param resource the {@link Resource} to be removed.
* @return true if removal is successful, false otherwise
*/
- public boolean removeInterval(ReservationInterval interval,
- Resource resource) {
+ public boolean removeInterval(
+ ReservationInterval interval, Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
// If the resource to be subtracted is less than the minimum resource in
// the range, abort removal to avoid negative capacity.
- // TODO revesit decrementing endTime
- if (!Resources.fitsIn(resource, getMinimumCapacityInInterval(
- new ReservationInterval(startTime, endTime - 1)))) {
+ if (!Resources.fitsIn(
+ resource, super.getMinimumCapacityInInterval(interval))) {
LOG.info("Request to remove more resources than what is available");
return false;
}
@@ -174,16 +125,17 @@ public class PeriodicRLESparseResourceAllocation
/**
* Get maximum capacity at periodic offsets from the specified time.
*
- * @param tick UTC time base from which offsets are specified for finding the
- * maximum capacity.
- * @param period periodic offset at which capacities are evaluated.
+ * @param tick UTC time base from which offsets are specified for finding
+ * the maximum capacity.
+ * @param period periodic offset at which capacities are evaluted.
* @return the maximum {@link Resource} across the specified time instants.
* @return true if removal is successful, false otherwise
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxResource;
if (period < timePeriod) {
- maxResource = super.getMaximumPeriodicCapacity(tick % timePeriod, period);
+ maxResource =
+ super.getMaximumPeriodicCapacity(tick % timePeriod, period);
} else {
// if period is greater than the length of PeriodicRLESparseAllocation,
// only a single value exists in this interval.
@@ -212,30 +164,4 @@ public class PeriodicRLESparseResourceAllocation
return ret.toString();
}
- @Override
- public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
- NavigableMap<Long, Resource> unrolledMap = new TreeMap<>();
- readLock.lock();
- try {
- long relativeStart = (start >= 0) ? start % timePeriod : 0;
- NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
- Long previous = cumulativeMap.floorKey(relativeStart);
- previous = (previous != null) ? previous : 0;
- for (long i = 0; i <= (end - start) / timePeriod; i++) {
- for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
- long curKey = e.getKey() + (i * timePeriod);
- if (curKey >= previous && (start + curKey - relativeStart) <= end) {
- unrolledMap.put(curKey, e.getValue());
- }
- }
- }
- RLESparseResourceAllocation rle =
- new RLESparseResourceAllocation(unrolledMap, getResourceCalculator());
- rle.shift(start - relativeStart);
- return rle;
- } finally {
- readLock.unlock();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
index 9afa324..504a250 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
@@ -28,58 +28,54 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
public interface PlanEdit extends PlanContext, PlanView {
/**
- * Add a new {@link ReservationAllocation} to the plan.
+ * Add a new {@link ReservationAllocation} to the plan
*
* @param reservation the {@link ReservationAllocation} to be added to the
* plan
* @param isRecovering flag to indicate if reservation is being added as part
* of failover or not
* @return true if addition is successful, false otherwise
- * @throws PlanningException if addition is unsuccessful
*/
- boolean addReservation(ReservationAllocation reservation,
+ public boolean addReservation(ReservationAllocation reservation,
boolean isRecovering) throws PlanningException;
/**
* Updates an existing {@link ReservationAllocation} in the plan. This is
- * required for re-negotiation.
+ * required for re-negotiation
*
* @param reservation the {@link ReservationAllocation} to be updated the plan
* @return true if update is successful, false otherwise
- * @throws PlanningException if update is unsuccessful
*/
- boolean updateReservation(ReservationAllocation reservation)
+ public boolean updateReservation(ReservationAllocation reservation)
throws PlanningException;
/**
* Delete an existing {@link ReservationAllocation} from the plan identified
* uniquely by its {@link ReservationId}. This will generally be used for
- * garbage collection.
+ * garbage collection
*
* @param reservationID the {@link ReservationAllocation} to be deleted from
* the plan identified uniquely by its {@link ReservationId}
* @return true if delete is successful, false otherwise
- * @throws PlanningException if deletion is unsuccessful
*/
- boolean deleteReservation(ReservationId reservationID)
+ public boolean deleteReservation(ReservationId reservationID)
throws PlanningException;
/**
* Method invoked to garbage collect old reservations. It cleans up expired
- * reservations that have fallen out of the sliding archival window.
+ * reservations that have fallen out of the sliding archival window
*
* @param tick the current time from which the archival window is computed
- * @throws PlanningException if archival is unsuccessful
*/
- void archiveCompletedReservations(long tick) throws PlanningException;
+ public void archiveCompletedReservations(long tick) throws PlanningException;
/**
* Sets the overall capacity in terms of {@link Resource} assigned to this
- * plan.
+ * plan
*
* @param capacity the overall capacity in terms of {@link Resource} assigned
* to this plan
*/
- void setTotalCapacity(Resource capacity);
+ public void setTotalCapacity(Resource capacity);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92d9ad77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 4035f68..2767993 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -17,50 +17,50 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.Set;
-
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import java.util.Set;
+
/**
* This interface provides a read-only view on the allocations made in this
* plan. This methods are used for example by {@code ReservationAgent}s to
* determine the free resources in a certain point in time, and by
* PlanFollowerPolicy to publish this plan to the scheduler.
*/
-interface PlanView extends PlanContext {
+public interface PlanView extends PlanContext {
/**
* Return a set of {@link ReservationAllocation} identified by the user who
* made the reservation.
*
* @param reservationID the unqiue id to identify the
- * {@link ReservationAllocation}
+ * {@link ReservationAllocation}
* @param interval the time interval used to retrieve the reservation
- * allocations from. Only reservations with start time no greater
- * than the interval end time, and end time no less than the interval
- * start time will be selected.
+ * allocations from. Only reservations with start time no
+ * greater than the interval end time, and end time no less
+ * than the interval start time will be selected.
* @param user the user to retrieve the reservation allocation from.
* @return a set of {@link ReservationAllocation} identified by the user who
- * made the reservation
+ * made the reservation
*/
- Set<ReservationAllocation> getReservations(ReservationId reservationID,
- ReservationInterval interval, String user);
+ Set<ReservationAllocation> getReservations(ReservationId
+ reservationID, ReservationInterval interval, String user);
/**
* Return a set of {@link ReservationAllocation} identified by any user.
*
* @param reservationID the unqiue id to identify the
- * {@link ReservationAllocation}
+ * {@link ReservationAllocation}
* @param interval the time interval used to retrieve the reservation
- * allocations from. Only reservations with start time no greater
- * than the interval end time, and end time no less than the interval
- * start time will be selected.
+ * allocations from. Only reservations with start time no
+ * greater than the interval end time, and end time no less
+ * than the interval start time will be selected.
* @return a set of {@link ReservationAllocation} identified by any user
*/
Set<ReservationAllocation> getReservations(ReservationId reservationID,
- ReservationInterval interval);
+ ReservationInterval interval);
/**
* Return a {@link ReservationAllocation} identified by its
@@ -70,7 +70,7 @@ interface PlanView extends PlanContext {
* {@link ReservationAllocation}
* @return {@link ReservationAllocation} identified by the specified id
*/
- ReservationAllocation getReservationById(ReservationId reservationID);
+ public ReservationAllocation getReservationById(ReservationId reservationID);
/**
* Return a set of {@link ReservationAllocation} that belongs to a certain
@@ -78,10 +78,11 @@ interface PlanView extends PlanContext {
*
* @param user the user being considered
* @param t the instant in time being considered
- * @return set of active {@link ReservationAllocation}s for this user at this
- * time
+ * @return set of active {@link ReservationAllocation}s for this
+ * user at this time
*/
- Set<ReservationAllocation> getReservationByUserAtTime(String user, long t);
+ public Set<ReservationAllocation> getReservationByUserAtTime(String user,
+ long t);
/**
* Gets all the active reservations at the specified point of time
@@ -90,14 +91,14 @@ interface PlanView extends PlanContext {
* requested
* @return set of active reservations at the specified time
*/
- Set<ReservationAllocation> getReservationsAtTime(long tick);
+ public Set<ReservationAllocation> getReservationsAtTime(long tick);
/**
* Gets all the reservations in the plan
*
* @return set of all reservations handled by this Plan
*/
- Set<ReservationAllocation> getAllReservations();
+ public Set<ReservationAllocation> getAllReservations();
/**
* Returns the total {@link Resource} reserved for all users at the specified
@@ -125,68 +126,61 @@ interface PlanView extends PlanContext {
*
* @return the time (UTC in ms) at which the first reservation starts
*/
- long getEarliestStartTime();
+ public long getEarliestStartTime();
/**
* Returns the time (UTC in ms) at which the last reservation terminates
*
* @return the time (UTC in ms) at which the last reservation terminates
*/
- long getLastEndTime();
+ public long getLastEndTime();
/**
* This method returns the amount of resources available to a given user
* (optionally if removing a certain reservation) over the start-end time
- * range. If the request is periodic (period is non-zero) we return the
- * minimum amount of resources available to periodic reservations (in all
- * "period" windows within the system maxPeriod / LCM).
+ * range.
*
- * @param user the user being considered
- * @param oldId the identifier of the existing reservation
- * @param start start of the time interval.
- * @param end end of the time interval.
- * @param period the ms periodicty for this request (loop and pick min till
- * maxPeriodicity)
+ * @param user
+ * @param oldId
+ * @param start
+ * @param end
* @return a view of the plan as it is available to this user
- * @throws PlanningException if operation is unsuccessful
+ * @throws PlanningException
*/
- RLESparseResourceAllocation getAvailableResourceOverTime(String user,
- ReservationId oldId, long start, long end, long period)
- throws PlanningException;
+ public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+ ReservationId oldId, long start, long end) throws PlanningException;
/**
* This method returns a RLE encoded view of the user reservation count
* utilization between start and end time.
*
- * @param user the user being considered
- * @param start start of the time interval.
- * @param end end of the time interval.
+ * @param user
+ * @param start
+ * @param end
* @return RLE encoded view of reservation used over time
*/
- RLESparseResourceAllocation getReservationCountForUserOverTime(String user,
- long start, long end);
+ public RLESparseResourceAllocation getReservationCountForUserOverTime(
+ String user, long start, long end);
/**
* This method returns a RLE encoded view of the user reservation utilization
* between start and end time.
*
- * @param user the user being considered
- * @param start start of the time interval.
- * @param end end of the time interval.
+ * @param user
+ * @param start
+ * @param end
* @return RLE encoded view of resources used over time
*/
- RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+ public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end);
/**
* Get the cumulative load over a time interval.
*
- * @param start start of the time interval.
- * @param end end of the time interval.
+ * @param start Start of the time interval.
+ * @param end End of the time interval.
* @return RLE sparse allocation.
- * @throws PlanningException if operation is unsuccessful
*/
- RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end)
- throws PlanningException;
+ RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org