You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/08 21:46:22 UTC

[2/2] hadoop git commit: YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).

YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).

(cherry picked from commit fa6137501c1499ae33f6e0e2adc31671a7e782dc)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2c051dbe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2c051dbe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2c051dbe

Branch: refs/heads/branch-2
Commit: 2c051dbe9edf1fa002090205a58ba999914acb0b
Parents: 2f88255
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Sep 7 19:07:17 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Fri Sep 8 14:44:46 2017 -0700

----------------------------------------------------------------------
 .../reservation/CapacityOverTimePolicy.java     |  32 +-
 .../reservation/NoOverCommitPolicy.java         |   8 +-
 .../reservation/BaseSharingPolicyTest.java      | 189 +++++++++++
 .../reservation/ReservationSystemTestUtil.java  |  28 +-
 .../reservation/TestCapacityOverTimePolicy.java | 339 +++++--------------
 .../reservation/TestNoOverCommitPolicy.java     | 185 +++-------
 6 files changed, 388 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index bb1a4e8..acd5774 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -95,26 +95,29 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
       throw new PlanningQuotaException(p);
     }
 
+    long checkStart = reservation.getStartTime() - validWindow;
+    long checkEnd = reservation.getEndTime() + validWindow;
+
     //---- check for integral violations of capacity --------
 
     // Gather a view of what to check (curr allocation of user, minus old
     // version of this reservation, plus new version)
     RLESparseResourceAllocation consumptionForUserOverTime =
         plan.getConsumptionForUserOverTime(reservation.getUser(),
-            reservation.getStartTime() - validWindow,
-            reservation.getEndTime() + validWindow);
+            checkStart, checkEnd);
 
     ReservationAllocation old =
         plan.getReservationById(reservation.getReservationId());
     if (old != null) {
-      consumptionForUserOverTime = RLESparseResourceAllocation
-          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              consumptionForUserOverTime, old.getResourcesOverTime(),
-              RLEOperator.add, reservation.getStartTime() - validWindow,
-              reservation.getEndTime() + validWindow);
+      consumptionForUserOverTime =
+          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+              plan.getTotalCapacity(), consumptionForUserOverTime,
+              old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add,
+              checkStart, checkEnd);
     }
 
-    RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
+    RLESparseResourceAllocation resRLE =
+        reservation.getResourcesOverTime(checkStart, checkEnd);
 
     RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
         .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
@@ -191,11 +194,11 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
 
     // compare using merge() limit with integral
     try {
-      RLESparseResourceAllocation
-          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              targetLimit, integral, RLEOperator.subtractTestNonNegative,
-              reservation.getStartTime() - validWindow,
-              reservation.getEndTime() + validWindow);
+
+      RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+          plan.getTotalCapacity(), targetLimit, integral,
+          RLEOperator.subtractTestNonNegative, checkStart, checkEnd);
+
     } catch (PlanningException p) {
       throw new PlanningQuotaException(
           "Integral (avg over time) quota capacity " + maxAvg
@@ -240,7 +243,8 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
     if (old != null) {
       used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
           Resources.clone(plan.getTotalCapacity()), used,
-          old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+          old.getResourcesOverTime(start, end), RLEOperator.subtract, start,
+          end);
     }
 
     instRLEQuota = RLESparseResourceAllocation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index 49d4702..98ef582 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -40,13 +40,17 @@ public class NoOverCommitPolicy implements SharingPolicy {
 
     RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
         reservation.getUser(), reservation.getReservationId(),
-        reservation.getStartTime(), reservation.getEndTime(), 0);
+        reservation.getStartTime(), reservation.getEndTime(),
+        reservation.getPeriodicity());
 
     // test the reservation does not exceed what is available
     try {
+
+      RLESparseResourceAllocation ask = reservation.getResourcesOverTime(
+              reservation.getStartTime(), reservation.getEndTime());
       RLESparseResourceAllocation
           .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              available, reservation.getResourcesOverTime(),
+              available, ask,
               RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
               reservation.getStartTime(), reservation.getEndTime());
     } catch (PlanningException p) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
new file mode 100644
index 0000000..294564a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
@@ -0,0 +1,189 @@
+/*******************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import net.jcip.annotations.NotThreadSafe;
+
+/**
+ * This class is a base test for {@code SharingPolicy} implementors.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public abstract class BaseSharingPolicyTest {
+
+  @Parameterized.Parameter(value = 0)
+  public long duration;
+
+  @Parameterized.Parameter(value = 1)
+  public double height;
+
+  @Parameterized.Parameter(value = 2)
+  public int numSubmissions;
+
+  @Parameterized.Parameter(value = 3)
+  public String recurrenceExpression;
+
+  @Parameterized.Parameter(value = 4)
+  public Class expectedError;
+
+  private long step;
+  private long initTime;
+
+  private InMemoryPlan plan;
+  private ReservationAgent mAgent;
+  private Resource minAlloc;
+  private ResourceCalculator res;
+  private Resource maxAlloc;
+
+  private int totCont = 1000;
+
+  protected ReservationSchedulerConfiguration conf;
+
+  @Before
+  public void setup() {
+    // 1 sec step
+    step = 1000L;
+    initTime = System.currentTimeMillis();
+
+    minAlloc = Resource.newInstance(1024, 1);
+    res = new DefaultResourceCalculator();
+    maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+    mAgent = mock(ReservationAgent.class);
+
+    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
+    Resource clusterResource =
+        ReservationSystemTestUtil.calculateClusterResource(totCont);
+
+    // invoke implementors initialization of policy
+    SharingPolicy policy = getInitializedPolicy();
+
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
+
+    plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
+        step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
+  }
+
+  public void runTest() throws IOException, PlanningException {
+
+    long period = 1;
+    if (recurrenceExpression != null) {
+      period = Long.parseLong(recurrenceExpression);
+    }
+
+    try {
+      RLESparseResourceAllocation rle = generateRLEAlloc(period);
+
+      // Generate the intervalMap (trimming out-of-period entries)
+      Map<ReservationInterval, Resource> reservationIntervalResourceMap;
+      if (period > 1) {
+        rle = new PeriodicRLESparseResourceAllocation(rle, period);
+        reservationIntervalResourceMap =
+            ReservationSystemTestUtil.toAllocation(rle, 0, period);
+      } else {
+        reservationIntervalResourceMap = ReservationSystemTestUtil
+            .toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
+      }
+
+      ReservationDefinition rDef =
+          ReservationSystemTestUtil.createSimpleReservationDefinition(
+              initTime % period, initTime % period + duration + 1, duration, 1,
+              recurrenceExpression);
+
+      // perform multiple submissions where required
+      for (int i = 0; i < numSubmissions; i++) {
+
+        long rstart = rle.getEarliestStartTime();
+        long rend = rle.getLatestNonNullTime();
+
+        InMemoryReservationAllocation resAlloc =
+            new InMemoryReservationAllocation(
+                ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
+                "dedicated", rstart, rend, reservationIntervalResourceMap, res,
+                minAlloc);
+
+        assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
+      }
+      // fail if error was expected
+      if (expectedError != null) {
+        System.out.println(plan.toString());
+        fail();
+      }
+    } catch (Exception e) {
+      if (expectedError == null || !e.getClass().getCanonicalName()
+          .equals(expectedError.getCanonicalName())) {
+        // fail on unexpected errors
+        throw e;
+      }
+    }
+  }
+
+  private RLESparseResourceAllocation generateRLEAlloc(long period) {
+    RLESparseResourceAllocation rle =
+        new RLESparseResourceAllocation(new DefaultResourceCalculator());
+
+    Resource alloc = Resources.multiply(minAlloc, height * totCont);
+
+    // loop in case the periodicity of the reservation is smaller than LCM
+    long rStart = initTime % period;
+    long rEnd = initTime % period + duration;
+
+
+    // handle wrap-around
+    if (period > 1 && rEnd > period) {
+      long diff = rEnd - period;
+      rEnd = period;
+
+      // handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
+      if(duration > period) {
+        rle.addInterval(new ReservationInterval(0, period),
+            Resources.multiply(alloc, duration / period - 1));
+        rle.addInterval(new ReservationInterval(0, diff % period), alloc);
+      } else {
+        rle.addInterval(new ReservationInterval(0, diff), alloc);
+      }
+    }
+
+
+    rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
+    return rle;
+  }
+
+  public abstract SharingPolicy getInitializedPolicy();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 5337e06..eef86a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/******************************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *******************************************************************************/
+ *****************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import static org.mockito.Matchers.any;
@@ -466,4 +466,28 @@ public class ReservationSystemTestUtil {
   public static Resource calculateClusterResource(int numContainers) {
     return Resource.newInstance(numContainers * 1024, numContainers);
   }
+
+
+  public static Map<ReservationInterval, Resource> toAllocation(
+      RLESparseResourceAllocation rle, long start, long end) {
+    Map<ReservationInterval, Resource> resAlloc = new TreeMap<>();
+
+    for (Map.Entry<Long, Resource> e : rle.getCumulative().entrySet()) {
+      Long nextKey = rle.getCumulative().higherKey(e.getKey());
+      if (nextKey == null) {
+        break;
+      } else {
+        if (e.getKey() >= start && e.getKey() <= end && nextKey >= start
+            && nextKey <= end) {
+          resAlloc.put(new ReservationInterval(e.getKey(), nextKey),
+              e.getValue());
+        }
+      }
+    }
+
+    return resAlloc;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 2dee60c..d054d3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -6,9 +6,9 @@
  *   to you under the Apache License, Version 2.0 (the
  *   "License"); you may not use this file except in compliance
  *   with the License.  You may obtain a copy of the License at
- *  
+ *
  *       http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,269 +17,118 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.Arrays;
+import java.util.Collection;
 
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This class tests the {@code CapacityOvertimePolicy} sharing policy.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
+
+  final static long ONEDAY = 86400 * 1000;
+  final static long ONEHOUR = 3600 * 1000;
+  final static long ONEMINUTE = 60 * 1000;
+  final static String TWODAYPERIOD = "7200000";
+  final static String ONEDAYPERIOD = "86400000";
+
+  @Parameterized.Parameters(name = "Duration {0}, height {1}," +
+          " submission {2}, periodic {3})")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+
+        // easy fit
+        {ONEHOUR, 0.25, 1, null, null },
+        {ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
+        {ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
+
+        // instantaneous high, but fit integral and inst limits
+        {ONEMINUTE, 0.74, 1, null, null },
+        {ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
+        {ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
+
+        // barely fit
+        {ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
+        {ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // overcommit with single reservation
+        {ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
+        {ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // barely fit with multiple reservations (instantaneously, lowering to
+        // 1min to fit integral)
+        {ONEMINUTE, 0.25, 3, null, null },
+        {ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
+        {ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
+
+        // overcommit with multiple reservations (instantaneously)
+        {ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
+        {ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // (non-periodic) reservation longer than window
+        {25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // (non-periodic) reservation longer than window
+        {25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // overcommit integral
+        {ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
+        {2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // overcommit integral
+        {ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
+        {2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
+            PlanningQuotaException.class },
+        {2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
+
+    });
+  }
 
-public class TestCapacityOverTimePolicy {
-
-  long timeWindow;
-  long step;
-  float avgConstraint;
-  float instConstraint;
-  long initTime;
-
-  InMemoryPlan plan;
-  ReservationAgent mAgent;
-  Resource minAlloc;
-  ResourceCalculator res;
-  Resource maxAlloc;
-
-  int totCont = 1000000;
-
-  @Before
-  public void setup() throws Exception {
+  @Override
+  public SharingPolicy getInitializedPolicy() {
 
     // 24h window
-    timeWindow = 86400000L;
+    long timeWindow = 86400000L;
+
     // 1 sec step
-    step = 1000L;
+    long step = 1000L;
 
     // 25% avg cap on capacity
-    avgConstraint = 25;
+    float avgConstraint = 25;
 
     // 70% instantaneous cap on capacity
-    instConstraint = 70;
+    float instConstraint = 75;
 
-    initTime = System.currentTimeMillis();
-    minAlloc = Resource.newInstance(1024, 1);
-    res = new DefaultResourceCalculator();
-    maxAlloc = Resource.newInstance(1024 * 8, 8);
-
-    mAgent = mock(ReservationAgent.class);
-    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
     String reservationQ =
         ReservationSystemTestUtil.getFullReservationQueueName();
-    Resource clusterResource =
-        ReservationSystemTestUtil.calculateClusterResource(totCont);
-    ReservationSchedulerConfiguration conf =
-        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
-            instConstraint, avgConstraint);
+    conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+        instConstraint, avgConstraint);
     CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
     policy.init(reservationQ, conf);
-    RMContext context = ReservationSystemTestUtil.createMockRMContext();
-
-    plan =
-        new InMemoryPlan(rootQueueMetrics, policy, mAgent,
-            clusterResource, step, res, minAlloc, maxAlloc,
-            "dedicated", null, true, context);
-  }
-
-  public int[] generateData(int length, int val) {
-    int[] data = new int[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = val;
-    }
-    return data;
-  }
-
-  @Test
-  public void testSimplePass() throws IOException, PlanningException {
-    // generate allocation that simply fit within all constraints
-    int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
-
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test(expected = PlanningException.class)
-  public void testAllocationLargerThanValidWindow() throws IOException,
-      PlanningException {
-    // generate allocation that exceed the validWindow
-    int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
-
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
+    return policy;
   }
 
   @Test
-  public void testSimplePass2() throws IOException, PlanningException {
-    // generate allocation from single tenant that exceed avg momentarily but
-    // fit within
-    // max instantanesou
-    int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test
-  public void testMultiTenantPass() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that barely fit in tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 4; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
-  }
-
-  @Test(expected = PlanningQuotaException.class)
-  public void testMultiTenantFail() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that exceed tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 5; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
-  }
-
-  @Test(expected = PlanningQuotaException.class)
-  public void testInstFail() throws IOException, PlanningException {
-    // generate allocation that exceed the instantaneous cap single-show
-    int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-    Assert.fail("should not have accepted this");
-  }
-
-  @Test
-  public void testInstFailBySum() throws IOException, PlanningException {
-    // generate allocation that exceed the instantaneous cap by sum
-    int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-    try {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-      Assert.fail();
-    } catch (PlanningQuotaException p) {
-      // expected
-    }
-  }
-
-  @Test(expected = PlanningQuotaException.class)
-  public void testFailAvg() throws IOException, PlanningException {
-    // generate an allocation which violates the 25% average single-shot
-    Map<ReservationInterval, Resource> req =
-        new TreeMap<ReservationInterval, Resource>();
-    long win = timeWindow / 2 + 100;
-    int cont = (int) Math.ceil(0.5 * totCont);
-    req.put(new ReservationInterval(initTime, initTime + win),
-        ReservationSystemUtil.toResource(
-            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-                cont)));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + win, win);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-  }
-
-  @Test
-  public void testFailAvgBySum() throws IOException, PlanningException {
-    // generate an allocation which violates the 25% average by sum
-    Map<ReservationInterval, Resource> req =
-        new TreeMap<ReservationInterval, Resource>();
-    long win = 86400000 / 4 + 1;
-    int cont = (int) Math.ceil(0.5 * totCont);
-    req.put(new ReservationInterval(initTime, initTime + win),
-        ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
-            .newInstance(1024, 1), cont)));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + win, win);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-
-    try {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-              "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-
-      Assert.fail("should not have accepted this");
-    } catch (PlanningQuotaException e) {
-      // expected
-    }
+  public void testAllocation() throws IOException, PlanningException {
+    runTest();
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
index c5edaf0..accdf24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -6,9 +6,9 @@
  *   to you under the Apache License, Version 2.0 (the
  *   "License"); you may not use this file except in compliance
  *   with the License.  You may obtain a copy of the License at
- *  
+ *
  *       http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,145 +17,70 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Before;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This clas tests {@code NoOverCommitPolicy} sharing policy.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public class TestNoOverCommitPolicy extends BaseSharingPolicyTest {
+
+  final static long ONEHOUR = 3600 * 1000;
+  final static String TWOHOURPERIOD = "7200000";
+
+  @Parameterized.Parameters(name = "Duration {0}, height {1}," +
+          " submissions {2}, periodic {3})")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+
+        // easy fit
+        {ONEHOUR, 0.25, 1, null, null },
+        {ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
+
+        // barely fit
+        {ONEHOUR, 1, 1, null, null },
+        {ONEHOUR, 1, 1, TWOHOURPERIOD, null },
+
+        // overcommit with single reservation
+        {ONEHOUR, 1.1, 1, null, ResourceOverCommitException.class },
+        {ONEHOUR, 1.1, 1, TWOHOURPERIOD, ResourceOverCommitException.class },
+
+        // barely fit with multiple reservations
+        {ONEHOUR, 0.25, 4, null, null },
+        {ONEHOUR, 0.25, 4, TWOHOURPERIOD, null },
+
+        // overcommit with multiple reservations
+        {ONEHOUR, 0.25, 5, null, ResourceOverCommitException.class },
+        {ONEHOUR, 0.25, 5, TWOHOURPERIOD, ResourceOverCommitException.class }
+
+    });
+  }
 
-public class TestNoOverCommitPolicy {
-
-  long step;
-  long initTime;
-
-  InMemoryPlan plan;
-  ReservationAgent mAgent;
-  Resource minAlloc;
-  ResourceCalculator res;
-  Resource maxAlloc;
-
-  int totCont = 1000000;
-
-  @Before
-  public void setup() throws Exception {
-
-    // 1 sec step
-    step = 1000L;
-
-    initTime = System.currentTimeMillis();
-    minAlloc = Resource.newInstance(1024, 1);
-    res = new DefaultResourceCalculator();
-    maxAlloc = Resource.newInstance(1024 * 8, 8);
-
-    mAgent = mock(ReservationAgent.class);
+  @Override
+  public SharingPolicy getInitializedPolicy() {
     String reservationQ =
         ReservationSystemTestUtil.getFullReservationQueueName();
-    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
-    Resource clusterResource =
-        ReservationSystemTestUtil.calculateClusterResource(totCont);
-    ReservationSchedulerConfiguration conf = mock
-        (ReservationSchedulerConfiguration.class);
-    NoOverCommitPolicy policy = new NoOverCommitPolicy();
+    conf = new CapacitySchedulerConfiguration();
+    SharingPolicy policy = new NoOverCommitPolicy();
     policy.init(reservationQ, conf);
-    RMContext context = ReservationSystemTestUtil.createMockRMContext();
-
-    plan =
-        new InMemoryPlan(rootQueueMetrics, policy, mAgent,
-            clusterResource, step, res, minAlloc, maxAlloc,
-            "dedicated", null, true, context);
-  }
-
-  public int[] generateData(int length, int val) {
-    int[] data = new int[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = val;
-    }
-    return data;
-  }
-
-  @Test
-  public void testSingleUserEasyFitPass() throws IOException, PlanningException {
-    // generate allocation that easily fit within resource constraints
-    int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test
-  public void testSingleUserBarelyFitPass() throws IOException,
-      PlanningException {
-    // generate allocation from single tenant that barely fit
-    int[] f = generateData(3600, totCont);
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test(expected = ResourceOverCommitException.class)
-  public void testSingleFail() throws IOException, PlanningException {
-    // generate allocation from single tenant that exceed capacity
-    int[] f = generateData(3600, (int) (1.1 * totCont));
-    plan.addReservation(new InMemoryReservationAllocation(
-        ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-        "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
-            .generateAllocation(initTime, step, f), res, minAlloc), false);
+    return policy;
   }
 
   @Test
-  public void testMultiTenantPass() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that barely fit in tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 4; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
+  public void testAllocation() throws IOException, PlanningException {
+    runTest();
   }
 
-  @Test(expected = ResourceOverCommitException.class)
-  public void testMultiTenantFail() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that exceed tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 5; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
-  }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org