You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:31 UTC

[16/33] git commit: YARN-2475. Logic for responding to capacity drops for the ReservationSystem. Contributed by Carlo Curino and Subru Krishnan. (cherry picked from commit f83a07f266f2c5e6eead554d8a331ed7e75e10d5) (cherry picked from commit 1c6950354f3c3

YARN-2475. Logic for responding to capacity drops for the ReservationSystem. Contributed by Carlo Curino and Subru Krishnan.
(cherry picked from commit f83a07f266f2c5e6eead554d8a331ed7e75e10d5)
(cherry picked from commit 1c6950354f3c35a7824770dc251d5aec3be4876a)
(cherry picked from commit b81f571e6064cee119bcbfed090ab646cdd927a7)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/950da32b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/950da32b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/950da32b

Branch: refs/heads/branch-2.6
Commit: 950da32b4c08865cac5d95c4ed5fa9576e271e5b
Parents: 5ac08c5
Author: carlo curino <Carlo Curino>
Authored: Fri Sep 12 16:52:54 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:29:12 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |   5 +
 .../reservation/SimpleCapacityReplanner.java    |  95 ++++++++++++
 .../TestSimpleCapacityReplanner.java            | 149 +++++++++++++++++++
 3 files changed, 249 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/950da32b/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
new file mode 100644
index 0000000..9fd4b3b
--- /dev/null
+++ b/YARN-1051-CHANGES.txt
@@ -0,0 +1,5 @@
+YARN-1707. Introduce APIs to add/remove/resize queues in the
+CapacityScheduler. (Carlo Curino and Subru Krishnan via curino)
+
+YARN-2475. Logic for responding to capacity drops for the 
+ReservationSystem. (Carlo Curino and Subru Krishnan via curino)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/950da32b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
new file mode 100644
index 0000000..8384538
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
@@ -0,0 +1,95 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This (re)planner scan a period of time from now to a maximum time window (or
+ * the end of the last session, whichever comes first) checking the overall
+ * capacity is not violated.
+ * 
+ * It greedily removes sessions in reversed order of acceptance (latest accepted
+ * is the first removed).
+ */
+public class SimpleCapacityReplanner implements Planner {
+
+  private static final Log LOG = LogFactory
+      .getLog(SimpleCapacityReplanner.class);
+
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private final Clock clock;
+
+  // this allows to control to time-span of this replanning
+  // far into the future time instants might be worth replanning for
+  // later on
+  private long lengthOfCheckZone;
+
+  public SimpleCapacityReplanner() {
+    this(new UTCClock());
+  }
+
+  @VisibleForTesting
+  SimpleCapacityReplanner(Clock clock) {
+    this.clock = clock;
+  }
+
+  @Override
+  public void init(String planQueueName, CapacitySchedulerConfiguration conf) {
+    this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
+  }
+
+  @Override
+  public void plan(Plan plan, List<ReservationDefinition> contracts)
+      throws PlanningException {
+
+    if (contracts != null) {
+      throw new RuntimeException(
+          "SimpleCapacityReplanner cannot handle new reservation contracts");
+    }
+
+    ResourceCalculator resCalc = plan.getResourceCalculator();
+    Resource totCap = plan.getTotalCapacity();
+    long now = clock.getTime();
+
+    // loop on all moment in time from now to the end of the check Zone
+    // or the end of the planned sessions whichever comes first
+    for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t +=
+        plan.getStep()) {
+      Resource excessCap =
+          Resources.subtract(plan.getTotalCommittedResources(t), totCap);
+      // if we are violating
+      if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
+        // sorted on reverse order of acceptance, so newest reservations first
+        Set<ReservationAllocation> curReservations =
+            new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
+        for (Iterator<ReservationAllocation> resIter =
+            curReservations.iterator(); resIter.hasNext()
+            && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) {
+          ReservationAllocation reservation = resIter.next();
+          plan.deleteReservation(reservation.getReservationId());
+          excessCap =
+              Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
+          LOG.info("Removing reservation " + reservation.getReservationId()
+              + " to repair physical-resource constraints in the plan: "
+              + plan.getQueueName());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/950da32b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
new file mode 100644
index 0000000..f2313e6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
@@ -0,0 +1,149 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Test;
+
+public class TestSimpleCapacityReplanner {
+
+  @Test
+  public void testReplanningPlanCapacityLoss() throws PlanningException {
+
+    Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
+    Resource minAlloc = Resource.newInstance(1024, 1);
+    Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+    ResourceCalculator res = new DefaultResourceCalculator();
+    long step = 1L;
+    Clock clock = mock(Clock.class);
+    ReservationAgent agent = mock(ReservationAgent.class);
+
+    SharingPolicy policy = new NoOverCommitPolicy();
+    policy.init("root.dedicated", null, new HashSet<String>());
+
+    QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+    when(clock.getTime()).thenReturn(0L);
+    SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
+
+    CapacitySchedulerConfiguration conf =
+        mock(CapacitySchedulerConfiguration.class);
+    when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
+
+    conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah"
+        + CapacitySchedulerConfiguration.DOT
+        + CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6);
+    enf.init("blah", conf);
+
+    // Initialize the plan with more resources
+    InMemoryPlan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+            res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
+
+    // add reservation filling the plan (separating them 1ms, so we are sure
+    // s2 follows s1 on acceptance
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int[] f5 = { 20, 20, 20, 20, 20 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(1L);
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(2L);
+    ReservationId r3 = ReservationId.newInstance(ts, 3);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(3L);
+    ReservationId r4 = ReservationId.newInstance(ts, 4);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(4L);
+    ReservationId r5 = ReservationId.newInstance(ts, 5);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+
+    int[] f6 = { 50, 50, 50, 50, 50 };
+    ReservationId r6 = ReservationId.newInstance(ts, 6);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
+            "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(6L);
+    ReservationId r7 = ReservationId.newInstance(ts, 7);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
+            "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+            minAlloc)));
+
+    // remove some of the resources (requires replanning)
+    plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
+
+    when(clock.getTime()).thenReturn(0L);
+
+    // run the replanner
+    enf.plan(plan, null);
+
+    // check which reservation are still present
+    assertNotNull(plan.getReservationById(r1));
+    assertNotNull(plan.getReservationById(r2));
+    assertNotNull(plan.getReservationById(r3));
+    assertNotNull(plan.getReservationById(r6));
+    assertNotNull(plan.getReservationById(r7));
+
+    // and which ones are removed
+    assertNull(plan.getReservationById(r4));
+    assertNull(plan.getReservationById(r5));
+
+    // check resources at each moment in time no more exceed capacity
+    for (int i = 0; i < 20; i++) {
+      int tot = 0;
+      for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
+        tot = r.getResourcesAtTime(i).getMemory();
+      }
+      assertTrue(tot <= 70 * 1024);
+    }
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new TreeMap<ReservationInterval, ReservationRequest>();
+    for (int i = 0; i < alloc.length; i++) {
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+              alloc[i]));
+    }
+    return req;
+  }
+
+}