You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/08 21:46:21 UTC

[1/2] hadoop git commit: YARN-5164. Use plan RLE to improve CapacityOverTimePolicy efficiency

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 1421196d2 -> 2c051dbe9


YARN-5164. Use plan RLE to improve CapacityOverTimePolicy efficiency

(cherry picked from commit d383bfdcd40c2315197fa5f85c3e5cb3bb83167f)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2f882558
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2f882558
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2f882558

Branch: refs/heads/branch-2
Commit: 2f8825588ee313057d565928f01d45e75206970d
Parents: 1421196
Author: Chris Douglas <cd...@apache.org>
Authored: Mon Jul 25 16:37:50 2016 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Fri Sep 8 14:44:34 2017 -0700

----------------------------------------------------------------------
 .../reservation/CapacityOverTimePolicy.java     | 299 +++++++++++--------
 .../reservation/TestCapacityOverTimePolicy.java |  20 +-
 2 files changed, 185 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f882558/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index 07bee99..bb1a4e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -1,26 +1,21 @@
 /*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- *       http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.util.Date;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -28,9 +23,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
 /**
  * This policy enforces a time-extended notion of Capacity. In particular it
  * guarantees that the allocation received in input when combined with all
@@ -39,11 +37,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  * validWindow, the integral of the allocations for a user (sum of the currently
  * submitted allocation and all prior allocations for the user) does not exceed
  * validWindow * maxAvg.
- * 
+ *
  * This allows flexibility, in the sense that an allocation can instantaneously
  * use large portions of the available capacity, but prevents abuses by bounding
  * the average use over time.
- * 
+ *
  * By controlling maxInst, maxAvg, validWindow the administrator configuring
  * this policy can obtain a behavior ranging from instantaneously enforced
  * capacity (akin to existing queues), or fully flexible allocations (likely
@@ -51,7 +49,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  */
 @LimitedPrivate("yarn")
 @Unstable
-public class CapacityOverTimePolicy implements SharingPolicy {
+public class CapacityOverTimePolicy extends NoOverCommitPolicy {
 
   private ReservationSchedulerConfiguration conf;
   private long validWindow;
@@ -68,123 +66,155 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     validWindow = this.conf.getReservationWindow(reservationQueuePath);
     maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
     maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
-  };
+  }
 
+  /**
+   * The validation algorithm walks over the RLE encoded allocation and
+   * checks that for all transition points (when the start or end of the
+   * checking window encounters a value in the RLE). At this point it
+   * checkes whether the integral computed exceeds the quota limit. Note that
+   * this might not find the exact time of a violation, but if a violation
+   * exists it will find it. The advantage is a much lower number of checks
+   * as compared to time-slot by time-slot checks.
+   *
+   * @param plan the plan to validate against
+   * @param reservation the reservation allocation to test.
+   * @throws PlanningException if the validation fails.
+   */
   @Override
   public void validate(Plan plan, ReservationAllocation reservation)
       throws PlanningException {
 
-    // this is entire method invoked under a write-lock on the plan, no need
-    // to synchronize accesses to the plan further
 
-    // Try to verify whether there is already a reservation with this ID in
-    // the system (remove its contribution during validation to simulate a
-    // try-n-swap
-    // update).
-    ReservationAllocation oldReservation =
+    // rely on NoOverCommitPolicy to check for: 1) user-match, 2) physical
+    // cluster limits, and 3) maxInst (via override of available)
+    try {
+      super.validate(plan, reservation);
+    } catch (PlanningException p) {
+      //wrap it in proper quota exception
+      throw new PlanningQuotaException(p);
+    }
+
+    //---- check for integral violations of capacity --------
+
+    // Gather a view of what to check (curr allocation of user, minus old
+    // version of this reservation, plus new version)
+    RLESparseResourceAllocation consumptionForUserOverTime =
+        plan.getConsumptionForUserOverTime(reservation.getUser(),
+            reservation.getStartTime() - validWindow,
+            reservation.getEndTime() + validWindow);
+
+    ReservationAllocation old =
         plan.getReservationById(reservation.getReservationId());
+    if (old != null) {
+      consumptionForUserOverTime = RLESparseResourceAllocation
+          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
+              consumptionForUserOverTime, old.getResourcesOverTime(),
+              RLEOperator.add, reservation.getStartTime() - validWindow,
+              reservation.getEndTime() + validWindow);
+    }
 
-    long startTime = reservation.getStartTime();
-    long endTime = reservation.getEndTime();
-    long step = plan.getStep();
+    RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
 
-    Resource planTotalCapacity = plan.getTotalCapacity();
+    RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
+            consumptionForUserOverTime, resRLE, RLEOperator.add, Long.MIN_VALUE,
+            Long.MAX_VALUE);
 
-    Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
-    Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
+    NavigableMap<Long, Resource> integralUp = new TreeMap<>();
+    NavigableMap<Long, Resource> integralDown = new TreeMap<>();
 
-    // define variable that will store integral of resources (need diff class to
-    // avoid overflow issues for long/large allocations)
+    long prevTime = toCheck.getEarliestStartTime();
+    IntegralResource prevResource = new IntegralResource(0L, 0L);
     IntegralResource runningTot = new IntegralResource(0L, 0L);
-    IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
-    maxAllowed.multiplyBy(validWindow / step);
-
-    RLESparseResourceAllocation userCons =
-        plan.getConsumptionForUserOverTime(reservation.getUser(), startTime
-            - validWindow, endTime + validWindow);
-
-    // check that the resources offered to the user during any window of length
-    // "validWindow" overlapping this allocation are within maxAllowed
-    // also enforce instantaneous and physical constraints during this pass
-    for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
-
-      Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
-      Resource currExistingAllocForUser = userCons.getCapacityAtTime(t);
-      Resource currNewAlloc = reservation.getResourcesAtTime(t);
-      Resource currOldAlloc = Resources.none();
-      if (oldReservation != null) {
-        currOldAlloc = oldReservation.getResourcesAtTime(t);
-      }
 
-      // throw exception if the cluster is overcommitted
-      // tot_allocated - old + new > capacity
-      Resource inst =
-          Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
-              currOldAlloc);
-      if (Resources.greaterThan(plan.getResourceCalculator(),
-          planTotalCapacity, inst, planTotalCapacity)) {
-        throw new ResourceOverCommitException(" Resources at time " + t
-            + " would be overcommitted (" + inst + " over "
-            + plan.getTotalCapacity() + ") by accepting reservation: "
-            + reservation.getReservationId());
-      }
+    // add intermediate points
+    Map<Long, Resource> temp = new TreeMap<>();
+    for (Map.Entry<Long, Resource> pointToCheck : toCheck.getCumulative()
+        .entrySet()) {
 
-      // throw exception if instantaneous limits are violated
-      // tot_alloc_to_this_user - old + new > inst_limit
-      if (Resources.greaterThan(plan.getResourceCalculator(),
-          planTotalCapacity, Resources.subtract(
-              Resources.add(currExistingAllocForUser, currNewAlloc),
-              currOldAlloc), maxInsRes)) {
-        throw new PlanningQuotaException("Instantaneous quota capacity "
-            + maxInst + " would be passed at time " + t
-            + " by accepting reservation: " + reservation.getReservationId());
-      }
+      Long timeToCheck = pointToCheck.getKey();
+      Resource resourceToCheck = pointToCheck.getValue();
 
-      // throw exception if the running integral of utilization over validWindow
-      // is violated. We perform a delta check, adding/removing instants at the
-      // boundary of the window from runningTot.
-
-      // runningTot = previous_runningTot + currExistingAllocForUser +
-      // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
-
-      // Where:
-      // 1) currNewAlloc, currExistingAllocForUser represent the contribution of
-      // the instant in time added in this pass.
-      // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
-      // instants that are being retired from the the window
-      // 3) currOldAlloc is the contribution (if any) of the previous version of
-      // this reservation (the one we are updating)
-
-      runningTot.add(currExistingAllocForUser);
-      runningTot.add(currNewAlloc);
-      runningTot.subtract(currOldAlloc);
-
-      // expire contributions from instant in time before (t - validWindow)
-      if (t > startTime) {
-        Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
-        Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
-
-        // runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
-        runningTot.subtract(pastOldAlloc);
-        runningTot.subtract(pastNewAlloc);
+      Long nextPoint = toCheck.getCumulative().higherKey(timeToCheck);
+      if (nextPoint == null || toCheck.getCumulative().get(nextPoint) == null) {
+        continue;
       }
-
-      // check integral
-      // runningTot > maxAvg * validWindow
-      // NOTE: we need to use comparator of IntegralResource directly, as
-      // Resource and ResourceCalculator assume "int" amount of resources,
-      // which is not sufficient when comparing integrals (out-of-bound)
-      if (maxAllowed.compareTo(runningTot) < 0) {
-        throw new PlanningQuotaException(
-            "Integral (avg over time) quota capacity " + maxAvg
-                + " over a window of " + validWindow / 1000 + " seconds, "
-                + " would be passed at time " + t + "(" + new Date(t)
-                + ") by accepting reservation: "
-                + reservation.getReservationId());
+      for (int i = 1; i <= (nextPoint - timeToCheck) / validWindow; i++) {
+        temp.put(timeToCheck + (i * validWindow), resourceToCheck);
+      }
+    }
+    temp.putAll(toCheck.getCumulative());
+
+    // compute point-wise integral for the up-fronts and down-fronts
+    for (Map.Entry<Long, Resource> currPoint : temp.entrySet()) {
+
+      Long currTime = currPoint.getKey();
+      Resource currResource = currPoint.getValue();
+
+      //add to running total current contribution
+      prevResource.multiplyBy(currTime - prevTime);
+      runningTot.add(prevResource);
+      integralUp.put(currTime, normalizeToResource(runningTot, validWindow));
+      integralDown.put(currTime + validWindow,
+          normalizeToResource(runningTot, validWindow));
+
+      if (currResource != null) {
+        prevResource.memory = currResource.getMemorySize();
+        prevResource.vcores = currResource.getVirtualCores();
+      } else {
+        prevResource.memory = 0L;
+        prevResource.vcores = 0L;
       }
+      prevTime = currTime;
+    }
+
+    // compute final integral as delta of up minus down transitions
+    RLESparseResourceAllocation intUp =
+        new RLESparseResourceAllocation(integralUp,
+            plan.getResourceCalculator());
+    RLESparseResourceAllocation intDown =
+        new RLESparseResourceAllocation(integralDown,
+            plan.getResourceCalculator());
+
+    RLESparseResourceAllocation integral = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), intUp,
+            intDown, RLEOperator.subtract, Long.MIN_VALUE, Long.MAX_VALUE);
+
+    // define over-time integral limit
+    // note: this is aligned with the normalization done above
+    NavigableMap<Long, Resource> tlimit = new TreeMap<>();
+    Resource maxAvgRes = Resources.multiply(plan.getTotalCapacity(), maxAvg);
+    tlimit.put(toCheck.getEarliestStartTime() - validWindow, maxAvgRes);
+    RLESparseResourceAllocation targetLimit =
+        new RLESparseResourceAllocation(tlimit, plan.getResourceCalculator());
+
+    // compare using merge() limit with integral
+    try {
+      RLESparseResourceAllocation
+          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
+              targetLimit, integral, RLEOperator.subtractTestNonNegative,
+              reservation.getStartTime() - validWindow,
+              reservation.getEndTime() + validWindow);
+    } catch (PlanningException p) {
+      throw new PlanningQuotaException(
+          "Integral (avg over time) quota capacity " + maxAvg
+              + " over a window of " + validWindow / 1000 + " seconds, "
+              + " would be exceeded by accepting reservation: " + reservation
+              .getReservationId(), p);
     }
   }
 
+  private Resource normalizeToResource(IntegralResource runningTot,
+      long window) {
+    // normalize to fit in windows. Rounding should not impact more than
+    // sub 1 core average allocations. This will all be removed once
+    // Resource moves to long.
+    int memory = (int) Math.round((double) runningTot.memory / window);
+    int vcores = (int) Math.round((double) runningTot.vcores / window);
+    return Resource.newInstance(memory, vcores);
+  }
+
   @Override
   public RLESparseResourceAllocation availableResources(
       RLESparseResourceAllocation available, Plan plan, String user,
@@ -208,21 +238,18 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     // add back in old reservation used resources if any
     ReservationAllocation old = plan.getReservationById(oldId);
     if (old != null) {
-      used =
-          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-              Resources.clone(plan.getTotalCapacity()), used,
-              old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+      used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+          Resources.clone(plan.getTotalCapacity()), used,
+          old.getResourcesOverTime(), RLEOperator.subtract, start, end);
     }
 
-    instRLEQuota =
-        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-            planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
-            end);
+    instRLEQuota = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), planTotalCapacity, instRLEQuota,
+            used, RLEOperator.subtract, start, end);
 
-    instRLEQuota =
-        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-            planTotalCapacity, available, instRLEQuota, RLEOperator.min, start,
-            end);
+    instRLEQuota = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), planTotalCapacity, available,
+            instRLEQuota, RLEOperator.min, start, end);
 
     return instRLEQuota;
   }
@@ -260,11 +287,20 @@ public class CapacityOverTimePolicy implements SharingPolicy {
       vcores += r.getVirtualCores();
     }
 
+    public void add(IntegralResource r) {
+      memory += r.memory;
+      vcores += r.vcores;
+    }
+
     public void subtract(Resource r) {
       memory -= r.getMemorySize();
       vcores -= r.getVirtualCores();
     }
 
+    public IntegralResource negate() {
+      return new IntegralResource(-memory, -vcores);
+    }
+
     public void multiplyBy(long window) {
       memory = memory * window;
       vcores = vcores * window;
@@ -282,8 +318,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     public String toString() {
       return "<memory:" + memory + ", vCores:" + vcores + ">";
     }
-  }
-
 
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f882558/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 4aed064..2dee60c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -117,6 +116,23 @@ public class TestCapacityOverTimePolicy {
             res, minAlloc), false));
   }
 
+  @Test(expected = PlanningException.class)
+  public void testAllocationLargerThanValidWindow() throws IOException,
+      PlanningException {
+    // generate allocation that exceed the validWindow
+    int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
+
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc), false));
+  }
+
   @Test
   public void testSimplePass2() throws IOException, PlanningException {
     // generate allocation from single tenant that exceed avg momentarily but
@@ -151,7 +167,7 @@ public class TestCapacityOverTimePolicy {
     }
   }
 
-  @Test(expected = ResourceOverCommitException.class)
+  @Test(expected = PlanningQuotaException.class)
   public void testMultiTenantFail() throws IOException, PlanningException {
     // generate allocation from multiple tenants that exceed tot capacity
     int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).

Posted by cu...@apache.org.
YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).

(cherry picked from commit fa6137501c1499ae33f6e0e2adc31671a7e782dc)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2c051dbe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2c051dbe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2c051dbe

Branch: refs/heads/branch-2
Commit: 2c051dbe9edf1fa002090205a58ba999914acb0b
Parents: 2f88255
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Sep 7 19:07:17 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Fri Sep 8 14:44:46 2017 -0700

----------------------------------------------------------------------
 .../reservation/CapacityOverTimePolicy.java     |  32 +-
 .../reservation/NoOverCommitPolicy.java         |   8 +-
 .../reservation/BaseSharingPolicyTest.java      | 189 +++++++++++
 .../reservation/ReservationSystemTestUtil.java  |  28 +-
 .../reservation/TestCapacityOverTimePolicy.java | 339 +++++--------------
 .../reservation/TestNoOverCommitPolicy.java     | 185 +++-------
 6 files changed, 388 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index bb1a4e8..acd5774 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -95,26 +95,29 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
       throw new PlanningQuotaException(p);
     }
 
+    long checkStart = reservation.getStartTime() - validWindow;
+    long checkEnd = reservation.getEndTime() + validWindow;
+
     //---- check for integral violations of capacity --------
 
     // Gather a view of what to check (curr allocation of user, minus old
     // version of this reservation, plus new version)
     RLESparseResourceAllocation consumptionForUserOverTime =
         plan.getConsumptionForUserOverTime(reservation.getUser(),
-            reservation.getStartTime() - validWindow,
-            reservation.getEndTime() + validWindow);
+            checkStart, checkEnd);
 
     ReservationAllocation old =
         plan.getReservationById(reservation.getReservationId());
     if (old != null) {
-      consumptionForUserOverTime = RLESparseResourceAllocation
-          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              consumptionForUserOverTime, old.getResourcesOverTime(),
-              RLEOperator.add, reservation.getStartTime() - validWindow,
-              reservation.getEndTime() + validWindow);
+      consumptionForUserOverTime =
+          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+              plan.getTotalCapacity(), consumptionForUserOverTime,
+              old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add,
+              checkStart, checkEnd);
     }
 
-    RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
+    RLESparseResourceAllocation resRLE =
+        reservation.getResourcesOverTime(checkStart, checkEnd);
 
     RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
         .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
@@ -191,11 +194,11 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
 
     // compare using merge() limit with integral
     try {
-      RLESparseResourceAllocation
-          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              targetLimit, integral, RLEOperator.subtractTestNonNegative,
-              reservation.getStartTime() - validWindow,
-              reservation.getEndTime() + validWindow);
+
+      RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+          plan.getTotalCapacity(), targetLimit, integral,
+          RLEOperator.subtractTestNonNegative, checkStart, checkEnd);
+
     } catch (PlanningException p) {
       throw new PlanningQuotaException(
           "Integral (avg over time) quota capacity " + maxAvg
@@ -240,7 +243,8 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
     if (old != null) {
       used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
           Resources.clone(plan.getTotalCapacity()), used,
-          old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+          old.getResourcesOverTime(start, end), RLEOperator.subtract, start,
+          end);
     }
 
     instRLEQuota = RLESparseResourceAllocation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index 49d4702..98ef582 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -40,13 +40,17 @@ public class NoOverCommitPolicy implements SharingPolicy {
 
     RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
         reservation.getUser(), reservation.getReservationId(),
-        reservation.getStartTime(), reservation.getEndTime(), 0);
+        reservation.getStartTime(), reservation.getEndTime(),
+        reservation.getPeriodicity());
 
     // test the reservation does not exceed what is available
     try {
+
+      RLESparseResourceAllocation ask = reservation.getResourcesOverTime(
+              reservation.getStartTime(), reservation.getEndTime());
       RLESparseResourceAllocation
           .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              available, reservation.getResourcesOverTime(),
+              available, ask,
               RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
               reservation.getStartTime(), reservation.getEndTime());
     } catch (PlanningException p) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
new file mode 100644
index 0000000..294564a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
@@ -0,0 +1,189 @@
+/*******************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import net.jcip.annotations.NotThreadSafe;
+
+/**
+ * This class is a base test for {@code SharingPolicy} implementors.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public abstract class BaseSharingPolicyTest {
+
+  @Parameterized.Parameter(value = 0)
+  public long duration;
+
+  @Parameterized.Parameter(value = 1)
+  public double height;
+
+  @Parameterized.Parameter(value = 2)
+  public int numSubmissions;
+
+  @Parameterized.Parameter(value = 3)
+  public String recurrenceExpression;
+
+  @Parameterized.Parameter(value = 4)
+  public Class expectedError;
+
+  private long step;
+  private long initTime;
+
+  private InMemoryPlan plan;
+  private ReservationAgent mAgent;
+  private Resource minAlloc;
+  private ResourceCalculator res;
+  private Resource maxAlloc;
+
+  private int totCont = 1000;
+
+  protected ReservationSchedulerConfiguration conf;
+
+  @Before
+  public void setup() {
+    // 1 sec step
+    step = 1000L;
+    initTime = System.currentTimeMillis();
+
+    minAlloc = Resource.newInstance(1024, 1);
+    res = new DefaultResourceCalculator();
+    maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+    mAgent = mock(ReservationAgent.class);
+
+    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
+    Resource clusterResource =
+        ReservationSystemTestUtil.calculateClusterResource(totCont);
+
+    // invoke implementors initialization of policy
+    SharingPolicy policy = getInitializedPolicy();
+
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
+
+    plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
+        step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
+  }
+
+  public void runTest() throws IOException, PlanningException {
+
+    long period = 1;
+    if (recurrenceExpression != null) {
+      period = Long.parseLong(recurrenceExpression);
+    }
+
+    try {
+      RLESparseResourceAllocation rle = generateRLEAlloc(period);
+
+      // Generate the intervalMap (trimming out-of-period entries)
+      Map<ReservationInterval, Resource> reservationIntervalResourceMap;
+      if (period > 1) {
+        rle = new PeriodicRLESparseResourceAllocation(rle, period);
+        reservationIntervalResourceMap =
+            ReservationSystemTestUtil.toAllocation(rle, 0, period);
+      } else {
+        reservationIntervalResourceMap = ReservationSystemTestUtil
+            .toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
+      }
+
+      ReservationDefinition rDef =
+          ReservationSystemTestUtil.createSimpleReservationDefinition(
+              initTime % period, initTime % period + duration + 1, duration, 1,
+              recurrenceExpression);
+
+      // perform multiple submissions where required
+      for (int i = 0; i < numSubmissions; i++) {
+
+        long rstart = rle.getEarliestStartTime();
+        long rend = rle.getLatestNonNullTime();
+
+        InMemoryReservationAllocation resAlloc =
+            new InMemoryReservationAllocation(
+                ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
+                "dedicated", rstart, rend, reservationIntervalResourceMap, res,
+                minAlloc);
+
+        assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
+      }
+      // fail if error was expected
+      if (expectedError != null) {
+        System.out.println(plan.toString());
+        fail();
+      }
+    } catch (Exception e) {
+      if (expectedError == null || !e.getClass().getCanonicalName()
+          .equals(expectedError.getCanonicalName())) {
+        // fail on unexpected errors
+        throw e;
+      }
+    }
+  }
+
+  private RLESparseResourceAllocation generateRLEAlloc(long period) {
+    RLESparseResourceAllocation rle =
+        new RLESparseResourceAllocation(new DefaultResourceCalculator());
+
+    Resource alloc = Resources.multiply(minAlloc, height * totCont);
+
+    // loop in case the periodicity of the reservation is smaller than LCM
+    long rStart = initTime % period;
+    long rEnd = initTime % period + duration;
+
+
+    // handle wrap-around
+    if (period > 1 && rEnd > period) {
+      long diff = rEnd - period;
+      rEnd = period;
+
+      // handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
+      if(duration > period) {
+        rle.addInterval(new ReservationInterval(0, period),
+            Resources.multiply(alloc, duration / period - 1));
+        rle.addInterval(new ReservationInterval(0, diff % period), alloc);
+      } else {
+        rle.addInterval(new ReservationInterval(0, diff), alloc);
+      }
+    }
+
+
+    rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
+    return rle;
+  }
+
+  public abstract SharingPolicy getInitializedPolicy();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 5337e06..eef86a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/******************************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *******************************************************************************/
+ *****************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import static org.mockito.Matchers.any;
@@ -466,4 +466,28 @@ public class ReservationSystemTestUtil {
   public static Resource calculateClusterResource(int numContainers) {
     return Resource.newInstance(numContainers * 1024, numContainers);
   }
+
+
+  public static Map<ReservationInterval, Resource> toAllocation(
+      RLESparseResourceAllocation rle, long start, long end) {
+    Map<ReservationInterval, Resource> resAlloc = new TreeMap<>();
+
+    for (Map.Entry<Long, Resource> e : rle.getCumulative().entrySet()) {
+      Long nextKey = rle.getCumulative().higherKey(e.getKey());
+      if (nextKey == null) {
+        break;
+      } else {
+        if (e.getKey() >= start && e.getKey() <= end && nextKey >= start
+            && nextKey <= end) {
+          resAlloc.put(new ReservationInterval(e.getKey(), nextKey),
+              e.getValue());
+        }
+      }
+    }
+
+    return resAlloc;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 2dee60c..d054d3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -6,9 +6,9 @@
  *   to you under the Apache License, Version 2.0 (the
  *   "License"); you may not use this file except in compliance
  *   with the License.  You may obtain a copy of the License at
- *  
+ *
  *       http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,269 +17,118 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.Arrays;
+import java.util.Collection;
 
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This class tests the {@code CapacityOvertimePolicy} sharing policy.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
+
+  final static long ONEDAY = 86400 * 1000;
+  final static long ONEHOUR = 3600 * 1000;
+  final static long ONEMINUTE = 60 * 1000;
+  final static String TWODAYPERIOD = "7200000";
+  final static String ONEDAYPERIOD = "86400000";
+
+  @Parameterized.Parameters(name = "Duration {0}, height {1}," +
+          " submission {2}, periodic {3})")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+
+        // easy fit
+        {ONEHOUR, 0.25, 1, null, null },
+        {ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
+        {ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
+
+        // instantaneous high, but fit integral and inst limits
+        {ONEMINUTE, 0.74, 1, null, null },
+        {ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
+        {ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
+
+        // barely fit
+        {ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
+        {ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // overcommit with single reservation
+        {ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
+        {ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // barely fit with multiple reservations (instantaneously, lowering to
+        // 1min to fit integral)
+        {ONEMINUTE, 0.25, 3, null, null },
+        {ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
+        {ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
+
+        // overcommit with multiple reservations (instantaneously)
+        {ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
+        {ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // (non-periodic) reservation longer than window
+        {25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // (non-periodic) reservation longer than window
+        {25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
+        {25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // overcommit integral
+        {ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
+        {2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
+
+        // overcommit integral
+        {ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
+        {2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
+            PlanningQuotaException.class },
+        {2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
+
+    });
+  }
 
-public class TestCapacityOverTimePolicy {
-
-  long timeWindow;
-  long step;
-  float avgConstraint;
-  float instConstraint;
-  long initTime;
-
-  InMemoryPlan plan;
-  ReservationAgent mAgent;
-  Resource minAlloc;
-  ResourceCalculator res;
-  Resource maxAlloc;
-
-  int totCont = 1000000;
-
-  @Before
-  public void setup() throws Exception {
+  @Override
+  public SharingPolicy getInitializedPolicy() {
 
     // 24h window
-    timeWindow = 86400000L;
+    long timeWindow = 86400000L;
+
     // 1 sec step
-    step = 1000L;
+    long step = 1000L;
 
     // 25% avg cap on capacity
-    avgConstraint = 25;
+    float avgConstraint = 25;
 
     // 70% instantaneous cap on capacity
-    instConstraint = 70;
+    float instConstraint = 75;
 
-    initTime = System.currentTimeMillis();
-    minAlloc = Resource.newInstance(1024, 1);
-    res = new DefaultResourceCalculator();
-    maxAlloc = Resource.newInstance(1024 * 8, 8);
-
-    mAgent = mock(ReservationAgent.class);
-    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
     String reservationQ =
         ReservationSystemTestUtil.getFullReservationQueueName();
-    Resource clusterResource =
-        ReservationSystemTestUtil.calculateClusterResource(totCont);
-    ReservationSchedulerConfiguration conf =
-        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
-            instConstraint, avgConstraint);
+    conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+        instConstraint, avgConstraint);
     CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
     policy.init(reservationQ, conf);
-    RMContext context = ReservationSystemTestUtil.createMockRMContext();
-
-    plan =
-        new InMemoryPlan(rootQueueMetrics, policy, mAgent,
-            clusterResource, step, res, minAlloc, maxAlloc,
-            "dedicated", null, true, context);
-  }
-
-  public int[] generateData(int length, int val) {
-    int[] data = new int[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = val;
-    }
-    return data;
-  }
-
-  @Test
-  public void testSimplePass() throws IOException, PlanningException {
-    // generate allocation that simply fit within all constraints
-    int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
-
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test(expected = PlanningException.class)
-  public void testAllocationLargerThanValidWindow() throws IOException,
-      PlanningException {
-    // generate allocation that exceed the validWindow
-    int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
-
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
+    return policy;
   }
 
   @Test
-  public void testSimplePass2() throws IOException, PlanningException {
-    // generate allocation from single tenant that exceed avg momentarily but
-    // fit within
-    // max instantanesou
-    int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test
-  public void testMultiTenantPass() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that barely fit in tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 4; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
-  }
-
-  @Test(expected = PlanningQuotaException.class)
-  public void testMultiTenantFail() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that exceed tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 5; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
-  }
-
-  @Test(expected = PlanningQuotaException.class)
-  public void testInstFail() throws IOException, PlanningException {
-    // generate allocation that exceed the instantaneous cap single-show
-    int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-    Assert.fail("should not have accepted this");
-  }
-
-  @Test
-  public void testInstFailBySum() throws IOException, PlanningException {
-    // generate allocation that exceed the instantaneous cap by sum
-    int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-    try {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-      Assert.fail();
-    } catch (PlanningQuotaException p) {
-      // expected
-    }
-  }
-
-  @Test(expected = PlanningQuotaException.class)
-  public void testFailAvg() throws IOException, PlanningException {
-    // generate an allocation which violates the 25% average single-shot
-    Map<ReservationInterval, Resource> req =
-        new TreeMap<ReservationInterval, Resource>();
-    long win = timeWindow / 2 + 100;
-    int cont = (int) Math.ceil(0.5 * totCont);
-    req.put(new ReservationInterval(initTime, initTime + win),
-        ReservationSystemUtil.toResource(
-            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-                cont)));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + win, win);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-  }
-
-  @Test
-  public void testFailAvgBySum() throws IOException, PlanningException {
-    // generate an allocation which violates the 25% average by sum
-    Map<ReservationInterval, Resource> req =
-        new TreeMap<ReservationInterval, Resource>();
-    long win = 86400000 / 4 + 1;
-    int cont = (int) Math.ceil(0.5 * totCont);
-    req.put(new ReservationInterval(initTime, initTime + win),
-        ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
-            .newInstance(1024, 1), cont)));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + win, win);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-
-    try {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-              "dedicated", initTime, initTime + win, req, res, minAlloc), false));
-
-      Assert.fail("should not have accepted this");
-    } catch (PlanningQuotaException e) {
-      // expected
-    }
+  public void testAllocation() throws IOException, PlanningException {
+    runTest();
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c051dbe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
index c5edaf0..accdf24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -6,9 +6,9 @@
  *   to you under the Apache License, Version 2.0 (the
  *   "License"); you may not use this file except in compliance
  *   with the License.  You may obtain a copy of the License at
- *  
+ *
  *       http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,145 +17,70 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Before;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This clas tests {@code NoOverCommitPolicy} sharing policy.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
+public class TestNoOverCommitPolicy extends BaseSharingPolicyTest {
+
+  final static long ONEHOUR = 3600 * 1000;
+  final static String TWOHOURPERIOD = "7200000";
+
+  @Parameterized.Parameters(name = "Duration {0}, height {1}," +
+          " submissions {2}, periodic {3})")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+
+        // easy fit
+        {ONEHOUR, 0.25, 1, null, null },
+        {ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
+
+        // barely fit
+        {ONEHOUR, 1, 1, null, null },
+        {ONEHOUR, 1, 1, TWOHOURPERIOD, null },
+
+        // overcommit with single reservation
+        {ONEHOUR, 1.1, 1, null, ResourceOverCommitException.class },
+        {ONEHOUR, 1.1, 1, TWOHOURPERIOD, ResourceOverCommitException.class },
+
+        // barely fit with multiple reservations
+        {ONEHOUR, 0.25, 4, null, null },
+        {ONEHOUR, 0.25, 4, TWOHOURPERIOD, null },
+
+        // overcommit with multiple reservations
+        {ONEHOUR, 0.25, 5, null, ResourceOverCommitException.class },
+        {ONEHOUR, 0.25, 5, TWOHOURPERIOD, ResourceOverCommitException.class }
+
+    });
+  }
 
-public class TestNoOverCommitPolicy {
-
-  long step;
-  long initTime;
-
-  InMemoryPlan plan;
-  ReservationAgent mAgent;
-  Resource minAlloc;
-  ResourceCalculator res;
-  Resource maxAlloc;
-
-  int totCont = 1000000;
-
-  @Before
-  public void setup() throws Exception {
-
-    // 1 sec step
-    step = 1000L;
-
-    initTime = System.currentTimeMillis();
-    minAlloc = Resource.newInstance(1024, 1);
-    res = new DefaultResourceCalculator();
-    maxAlloc = Resource.newInstance(1024 * 8, 8);
-
-    mAgent = mock(ReservationAgent.class);
+  @Override
+  public SharingPolicy getInitializedPolicy() {
     String reservationQ =
         ReservationSystemTestUtil.getFullReservationQueueName();
-    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
-    Resource clusterResource =
-        ReservationSystemTestUtil.calculateClusterResource(totCont);
-    ReservationSchedulerConfiguration conf = mock
-        (ReservationSchedulerConfiguration.class);
-    NoOverCommitPolicy policy = new NoOverCommitPolicy();
+    conf = new CapacitySchedulerConfiguration();
+    SharingPolicy policy = new NoOverCommitPolicy();
     policy.init(reservationQ, conf);
-    RMContext context = ReservationSystemTestUtil.createMockRMContext();
-
-    plan =
-        new InMemoryPlan(rootQueueMetrics, policy, mAgent,
-            clusterResource, step, res, minAlloc, maxAlloc,
-            "dedicated", null, true, context);
-  }
-
-  public int[] generateData(int length, int val) {
-    int[] data = new int[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = val;
-    }
-    return data;
-  }
-
-  @Test
-  public void testSingleUserEasyFitPass() throws IOException, PlanningException {
-    // generate allocation that easily fit within resource constraints
-    int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test
-  public void testSingleUserBarelyFitPass() throws IOException,
-      PlanningException {
-    // generate allocation from single tenant that barely fit
-    int[] f = generateData(3600, totCont);
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + f.length,
-            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc), false));
-  }
-
-  @Test(expected = ResourceOverCommitException.class)
-  public void testSingleFail() throws IOException, PlanningException {
-    // generate allocation from single tenant that exceed capacity
-    int[] f = generateData(3600, (int) (1.1 * totCont));
-    plan.addReservation(new InMemoryReservationAllocation(
-        ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-        "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
-            .generateAllocation(initTime, step, f), res, minAlloc), false);
+    return policy;
   }
 
   @Test
-  public void testMultiTenantPass() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that barely fit in tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 4; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
+  public void testAllocation() throws IOException, PlanningException {
+    runTest();
   }
 
-  @Test(expected = ResourceOverCommitException.class)
-  public void testMultiTenantFail() throws IOException, PlanningException {
-    // generate allocation from multiple tenants that exceed tot capacity
-    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            initTime, initTime + f.length + 1, f.length);
-    for (int i = 0; i < 5; i++) {
-      assertTrue(plan.toString(),
-          plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
-              "dedicated", initTime, initTime + f.length,
-              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc), false));
-    }
-  }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org