You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 20:57:42 UTC
[10/45] hadoop git commit: YARN-5330. SharingPolicy enhancements
required to support recurring reservations in ReservationSystem. (Carlo
Curino via Subru).
YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa613750
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa613750
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa613750
Branch: refs/heads/HDFS-10467
Commit: fa6137501c1499ae33f6e0e2adc31671a7e782dc
Parents: 56d93d2
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Sep 7 19:07:17 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Sep 7 19:07:17 2017 -0700
----------------------------------------------------------------------
.../reservation/CapacityOverTimePolicy.java | 32 +-
.../reservation/NoOverCommitPolicy.java | 8 +-
.../reservation/BaseSharingPolicyTest.java | 189 +++++++++++
.../reservation/ReservationSystemTestUtil.java | 28 +-
.../reservation/TestCapacityOverTimePolicy.java | 339 +++++--------------
.../reservation/TestNoOverCommitPolicy.java | 185 +++-------
6 files changed, 388 insertions(+), 393 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa613750/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index bb1a4e8..acd5774 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -95,26 +95,29 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
throw new PlanningQuotaException(p);
}
+ long checkStart = reservation.getStartTime() - validWindow;
+ long checkEnd = reservation.getEndTime() + validWindow;
+
//---- check for integral violations of capacity --------
// Gather a view of what to check (curr allocation of user, minus old
// version of this reservation, plus new version)
RLESparseResourceAllocation consumptionForUserOverTime =
plan.getConsumptionForUserOverTime(reservation.getUser(),
- reservation.getStartTime() - validWindow,
- reservation.getEndTime() + validWindow);
+ checkStart, checkEnd);
ReservationAllocation old =
plan.getReservationById(reservation.getReservationId());
if (old != null) {
- consumptionForUserOverTime = RLESparseResourceAllocation
- .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
- consumptionForUserOverTime, old.getResourcesOverTime(),
- RLEOperator.add, reservation.getStartTime() - validWindow,
- reservation.getEndTime() + validWindow);
+ consumptionForUserOverTime =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), consumptionForUserOverTime,
+ old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add,
+ checkStart, checkEnd);
}
- RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
+ RLESparseResourceAllocation resRLE =
+ reservation.getResourcesOverTime(checkStart, checkEnd);
RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
@@ -191,11 +194,11 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
// compare using merge() limit with integral
try {
- RLESparseResourceAllocation
- .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
- targetLimit, integral, RLEOperator.subtractTestNonNegative,
- reservation.getStartTime() - validWindow,
- reservation.getEndTime() + validWindow);
+
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), targetLimit, integral,
+ RLEOperator.subtractTestNonNegative, checkStart, checkEnd);
+
} catch (PlanningException p) {
throw new PlanningQuotaException(
"Integral (avg over time) quota capacity " + maxAvg
@@ -240,7 +243,8 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
if (old != null) {
used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
Resources.clone(plan.getTotalCapacity()), used,
- old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+ old.getResourcesOverTime(start, end), RLEOperator.subtract, start,
+ end);
}
instRLEQuota = RLESparseResourceAllocation
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa613750/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index 49d4702..98ef582 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -40,13 +40,17 @@ public class NoOverCommitPolicy implements SharingPolicy {
RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
reservation.getUser(), reservation.getReservationId(),
- reservation.getStartTime(), reservation.getEndTime(), 0);
+ reservation.getStartTime(), reservation.getEndTime(),
+ reservation.getPeriodicity());
// test the reservation does not exceed what is available
try {
+
+ RLESparseResourceAllocation ask = reservation.getResourcesOverTime(
+ reservation.getStartTime(), reservation.getEndTime());
RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
- available, reservation.getResourcesOverTime(),
+ available, ask,
RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
reservation.getStartTime(), reservation.getEndTime());
} catch (PlanningException p) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa613750/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
new file mode 100644
index 0000000..294564a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
@@ -0,0 +1,189 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import net.jcip.annotations.NotThreadSafe;
+
+/**
+ * This class is a base test for {@code SharingPolicy} implementors.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public abstract class BaseSharingPolicyTest {
+
+ @Parameterized.Parameter(value = 0)
+ public long duration;
+
+ @Parameterized.Parameter(value = 1)
+ public double height;
+
+ @Parameterized.Parameter(value = 2)
+ public int numSubmissions;
+
+ @Parameterized.Parameter(value = 3)
+ public String recurrenceExpression;
+
+ @Parameterized.Parameter(value = 4)
+ public Class expectedError;
+
+ private long step;
+ private long initTime;
+
+ private InMemoryPlan plan;
+ private ReservationAgent mAgent;
+ private Resource minAlloc;
+ private ResourceCalculator res;
+ private Resource maxAlloc;
+
+ private int totCont = 1000;
+
+ protected ReservationSchedulerConfiguration conf;
+
+ @Before
+ public void setup() {
+ // 1 sec step
+ step = 1000L;
+ initTime = System.currentTimeMillis();
+
+ minAlloc = Resource.newInstance(1024, 1);
+ res = new DefaultResourceCalculator();
+ maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+ mAgent = mock(ReservationAgent.class);
+
+ QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
+ Resource clusterResource =
+ ReservationSystemTestUtil.calculateClusterResource(totCont);
+
+ // invoke implementors initialization of policy
+ SharingPolicy policy = getInitializedPolicy();
+
+ RMContext context = ReservationSystemTestUtil.createMockRMContext();
+
+ plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
+ step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
+ }
+
+ public void runTest() throws IOException, PlanningException {
+
+ long period = 1;
+ if (recurrenceExpression != null) {
+ period = Long.parseLong(recurrenceExpression);
+ }
+
+ try {
+ RLESparseResourceAllocation rle = generateRLEAlloc(period);
+
+ // Generate the intervalMap (trimming out-of-period entries)
+ Map<ReservationInterval, Resource> reservationIntervalResourceMap;
+ if (period > 1) {
+ rle = new PeriodicRLESparseResourceAllocation(rle, period);
+ reservationIntervalResourceMap =
+ ReservationSystemTestUtil.toAllocation(rle, 0, period);
+ } else {
+ reservationIntervalResourceMap = ReservationSystemTestUtil
+ .toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ ReservationDefinition rDef =
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ initTime % period, initTime % period + duration + 1, duration, 1,
+ recurrenceExpression);
+
+ // perform multiple submissions where required
+ for (int i = 0; i < numSubmissions; i++) {
+
+ long rstart = rle.getEarliestStartTime();
+ long rend = rle.getLatestNonNullTime();
+
+ InMemoryReservationAllocation resAlloc =
+ new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
+ "dedicated", rstart, rend, reservationIntervalResourceMap, res,
+ minAlloc);
+
+ assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
+ }
+ // fail if error was expected
+ if (expectedError != null) {
+ System.out.println(plan.toString());
+ fail();
+ }
+ } catch (Exception e) {
+ if (expectedError == null || !e.getClass().getCanonicalName()
+ .equals(expectedError.getCanonicalName())) {
+ // fail on unexpected errors
+ throw e;
+ }
+ }
+ }
+
+ private RLESparseResourceAllocation generateRLEAlloc(long period) {
+ RLESparseResourceAllocation rle =
+ new RLESparseResourceAllocation(new DefaultResourceCalculator());
+
+ Resource alloc = Resources.multiply(minAlloc, height * totCont);
+
+ // loop in case the periodicity of the reservation is smaller than LCM
+ long rStart = initTime % period;
+ long rEnd = initTime % period + duration;
+
+
+ // handle wrap-around
+ if (period > 1 && rEnd > period) {
+ long diff = rEnd - period;
+ rEnd = period;
+
+ // handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
+ if(duration > period) {
+ rle.addInterval(new ReservationInterval(0, period),
+ Resources.multiply(alloc, duration / period - 1));
+ rle.addInterval(new ReservationInterval(0, diff % period), alloc);
+ } else {
+ rle.addInterval(new ReservationInterval(0, diff), alloc);
+ }
+ }
+
+
+ rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
+ return rle;
+ }
+
+ public abstract SharingPolicy getInitializedPolicy();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa613750/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 5337e06..eef86a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *******************************************************************************/
+ *****************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any;
@@ -466,4 +466,28 @@ public class ReservationSystemTestUtil {
public static Resource calculateClusterResource(int numContainers) {
return Resource.newInstance(numContainers * 1024, numContainers);
}
+
+
+ public static Map<ReservationInterval, Resource> toAllocation(
+ RLESparseResourceAllocation rle, long start, long end) {
+ Map<ReservationInterval, Resource> resAlloc = new TreeMap<>();
+
+ for (Map.Entry<Long, Resource> e : rle.getCumulative().entrySet()) {
+ Long nextKey = rle.getCumulative().higherKey(e.getKey());
+ if (nextKey == null) {
+ break;
+ } else {
+ if (e.getKey() >= start && e.getKey() <= end && nextKey >= start
+ && nextKey <= end) {
+ resAlloc.put(new ReservationInterval(e.getKey(), nextKey),
+ e.getValue());
+ }
+ }
+ }
+
+ return resAlloc;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa613750/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 2dee60c..d054d3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,269 +17,118 @@
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.Arrays;
+import java.util.Collection;
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This class tests the {@code CapacityOvertimePolicy} sharing policy.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
+
+ final static long ONEDAY = 86400 * 1000;
+ final static long ONEHOUR = 3600 * 1000;
+ final static long ONEMINUTE = 60 * 1000;
+ final static String TWODAYPERIOD = "7200000";
+ final static String ONEDAYPERIOD = "86400000";
+
+ @Parameterized.Parameters(name = "Duration {0}, height {1}," +
+ " submission {2}, periodic {3})")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+
+ // easy fit
+ {ONEHOUR, 0.25, 1, null, null },
+ {ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
+ {ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
+
+ // instantaneous high, but fit integral and inst limits
+ {ONEMINUTE, 0.74, 1, null, null },
+ {ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
+ {ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
+
+ // barely fit
+ {ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
+ {ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
+ {ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+ // overcommit with single reservation
+ {ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
+ {ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
+ {ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+ // barely fit with multiple reservations (instantaneously, lowering to
+ // 1min to fit integral)
+ {ONEMINUTE, 0.25, 3, null, null },
+ {ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
+ {ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
+
+ // overcommit with multiple reservations (instantaneously)
+ {ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
+ {ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
+ {ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
+
+ // (non-periodic) reservation longer than window
+ {25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
+ {25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
+ {25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+ // (non-periodic) reservation longer than window
+ {25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
+ {25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
+ {25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
+
+ // overcommit integral
+ {ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
+ {2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
+ {2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+ // overcommit integral
+ {ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
+ {2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
+ PlanningQuotaException.class },
+ {2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
+
+ });
+ }
-public class TestCapacityOverTimePolicy {
-
- long timeWindow;
- long step;
- float avgConstraint;
- float instConstraint;
- long initTime;
-
- InMemoryPlan plan;
- ReservationAgent mAgent;
- Resource minAlloc;
- ResourceCalculator res;
- Resource maxAlloc;
-
- int totCont = 1000000;
-
- @Before
- public void setup() throws Exception {
+ @Override
+ public SharingPolicy getInitializedPolicy() {
// 24h window
- timeWindow = 86400000L;
+ long timeWindow = 86400000L;
+
// 1 sec step
- step = 1000L;
+ long step = 1000L;
// 25% avg cap on capacity
- avgConstraint = 25;
+ float avgConstraint = 25;
// 70% instantaneous cap on capacity
- instConstraint = 70;
+ float instConstraint = 75;
- initTime = System.currentTimeMillis();
- minAlloc = Resource.newInstance(1024, 1);
- res = new DefaultResourceCalculator();
- maxAlloc = Resource.newInstance(1024 * 8, 8);
-
- mAgent = mock(ReservationAgent.class);
- QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
- Resource clusterResource =
- ReservationSystemTestUtil.calculateClusterResource(totCont);
- ReservationSchedulerConfiguration conf =
- ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
- instConstraint, avgConstraint);
+ conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+ instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
- RMContext context = ReservationSystemTestUtil.createMockRMContext();
-
- plan =
- new InMemoryPlan(rootQueueMetrics, policy, mAgent,
- clusterResource, step, res, minAlloc, maxAlloc,
- "dedicated", null, true, context);
- }
-
- public int[] generateData(int length, int val) {
- int[] data = new int[length];
- for (int i = 0; i < length; i++) {
- data[i] = val;
- }
- return data;
- }
-
- @Test
- public void testSimplePass() throws IOException, PlanningException {
- // generate allocation that simply fit within all constraints
- int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
-
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
-
- @Test(expected = PlanningException.class)
- public void testAllocationLargerThanValidWindow() throws IOException,
- PlanningException {
- // generate allocation that exceed the validWindow
- int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
-
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
+ return policy;
}
@Test
- public void testSimplePass2() throws IOException, PlanningException {
- // generate allocation from single tenant that exceed avg momentarily but
- // fit within
- // max instantanesou
- int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
-
- @Test
- public void testMultiTenantPass() throws IOException, PlanningException {
- // generate allocation from multiple tenants that barely fit in tot capacity
- int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- for (int i = 0; i < 4; i++) {
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
- }
-
- @Test(expected = PlanningQuotaException.class)
- public void testMultiTenantFail() throws IOException, PlanningException {
- // generate allocation from multiple tenants that exceed tot capacity
- int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- for (int i = 0; i < 5; i++) {
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
- }
-
- @Test(expected = PlanningQuotaException.class)
- public void testInstFail() throws IOException, PlanningException {
- // generate allocation that exceed the instantaneous cap single-show
- int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- Assert.fail("should not have accepted this");
- }
-
- @Test
- public void testInstFailBySum() throws IOException, PlanningException {
- // generate allocation that exceed the instantaneous cap by sum
- int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- try {
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- Assert.fail();
- } catch (PlanningQuotaException p) {
- // expected
- }
- }
-
- @Test(expected = PlanningQuotaException.class)
- public void testFailAvg() throws IOException, PlanningException {
- // generate an allocation which violates the 25% average single-shot
- Map<ReservationInterval, Resource> req =
- new TreeMap<ReservationInterval, Resource>();
- long win = timeWindow / 2 + 100;
- int cont = (int) Math.ceil(0.5 * totCont);
- req.put(new ReservationInterval(initTime, initTime + win),
- ReservationSystemUtil.toResource(
- ReservationRequest.newInstance(Resource.newInstance(1024, 1),
- cont)));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + win, win);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + win, req, res, minAlloc), false));
- }
-
- @Test
- public void testFailAvgBySum() throws IOException, PlanningException {
- // generate an allocation which violates the 25% average by sum
- Map<ReservationInterval, Resource> req =
- new TreeMap<ReservationInterval, Resource>();
- long win = 86400000 / 4 + 1;
- int cont = (int) Math.ceil(0.5 * totCont);
- req.put(new ReservationInterval(initTime, initTime + win),
- ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
- .newInstance(1024, 1), cont)));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + win, win);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-
- try {
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), null, "u1",
- "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-
- Assert.fail("should not have accepted this");
- } catch (PlanningQuotaException e) {
- // expected
- }
+ public void testAllocation() throws IOException, PlanningException {
+ runTest();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa613750/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
index c5edaf0..accdf24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,145 +17,70 @@
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Before;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This clas tests {@code NoOverCommitPolicy} sharing policy.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public class TestNoOverCommitPolicy extends BaseSharingPolicyTest {
+
+ final static long ONEHOUR = 3600 * 1000;
+ final static String TWOHOURPERIOD = "7200000";
+
+ @Parameterized.Parameters(name = "Duration {0}, height {1}," +
+ " submissions {2}, periodic {3})")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+
+ // easy fit
+ {ONEHOUR, 0.25, 1, null, null },
+ {ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
+
+ // barely fit
+ {ONEHOUR, 1, 1, null, null },
+ {ONEHOUR, 1, 1, TWOHOURPERIOD, null },
+
+ // overcommit with single reservation
+ {ONEHOUR, 1.1, 1, null, ResourceOverCommitException.class },
+ {ONEHOUR, 1.1, 1, TWOHOURPERIOD, ResourceOverCommitException.class },
+
+ // barely fit with multiple reservations
+ {ONEHOUR, 0.25, 4, null, null },
+ {ONEHOUR, 0.25, 4, TWOHOURPERIOD, null },
+
+ // overcommit with multiple reservations
+ {ONEHOUR, 0.25, 5, null, ResourceOverCommitException.class },
+ {ONEHOUR, 0.25, 5, TWOHOURPERIOD, ResourceOverCommitException.class }
+
+ });
+ }
-public class TestNoOverCommitPolicy {
-
- long step;
- long initTime;
-
- InMemoryPlan plan;
- ReservationAgent mAgent;
- Resource minAlloc;
- ResourceCalculator res;
- Resource maxAlloc;
-
- int totCont = 1000000;
-
- @Before
- public void setup() throws Exception {
-
- // 1 sec step
- step = 1000L;
-
- initTime = System.currentTimeMillis();
- minAlloc = Resource.newInstance(1024, 1);
- res = new DefaultResourceCalculator();
- maxAlloc = Resource.newInstance(1024 * 8, 8);
-
- mAgent = mock(ReservationAgent.class);
+ @Override
+ public SharingPolicy getInitializedPolicy() {
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
- QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
- Resource clusterResource =
- ReservationSystemTestUtil.calculateClusterResource(totCont);
- ReservationSchedulerConfiguration conf = mock
- (ReservationSchedulerConfiguration.class);
- NoOverCommitPolicy policy = new NoOverCommitPolicy();
+ conf = new CapacitySchedulerConfiguration();
+ SharingPolicy policy = new NoOverCommitPolicy();
policy.init(reservationQ, conf);
- RMContext context = ReservationSystemTestUtil.createMockRMContext();
-
- plan =
- new InMemoryPlan(rootQueueMetrics, policy, mAgent,
- clusterResource, step, res, minAlloc, maxAlloc,
- "dedicated", null, true, context);
- }
-
- public int[] generateData(int length, int val) {
- int[] data = new int[length];
- for (int i = 0; i < length; i++) {
- data[i] = val;
- }
- return data;
- }
-
- @Test
- public void testSingleUserEasyFitPass() throws IOException, PlanningException {
- // generate allocation that easily fit within resource constraints
- int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
-
- @Test
- public void testSingleUserBarelyFitPass() throws IOException,
- PlanningException {
- // generate allocation from single tenant that barely fit
- int[] f = generateData(3600, totCont);
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
-
- @Test(expected = ResourceOverCommitException.class)
- public void testSingleFail() throws IOException, PlanningException {
- // generate allocation from single tenant that exceed capacity
- int[] f = generateData(3600, (int) (1.1 * totCont));
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), null, "u1",
- "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
- .generateAllocation(initTime, step, f), res, minAlloc), false);
+ return policy;
}
@Test
- public void testMultiTenantPass() throws IOException, PlanningException {
- // generate allocation from multiple tenants that barely fit in tot capacity
- int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- for (int i = 0; i < 4; i++) {
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
+ public void testAllocation() throws IOException, PlanningException {
+ runTest();
}
- @Test(expected = ResourceOverCommitException.class)
- public void testMultiTenantFail() throws IOException, PlanningException {
- // generate allocation from multiple tenants that exceed tot capacity
- int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
- ReservationDefinition rDef =
- ReservationSystemTestUtil.createSimpleReservationDefinition(
- initTime, initTime + f.length + 1, f.length);
- for (int i = 0; i < 5; i++) {
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
- "dedicated", initTime, initTime + f.length,
- ReservationSystemTestUtil.generateAllocation(initTime, step, f),
- res, minAlloc), false));
- }
- }
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org