You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2015/07/25 16:49:40 UTC
[1/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 621203bf4 -> 26ea04581
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
new file mode 100644
index 0000000..bd18a2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -0,0 +1,611 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestGreedyReservationAgent {
+
+ ReservationAgent agent;
+ InMemoryPlan plan;
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ ResourceCalculator res = new DefaultResourceCalculator();
+ Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+ Random rand = new Random();
+ long step;
+
+ @Before
+ public void setup() throws Exception {
+
+ long seed = rand.nextLong();
+ rand.setSeed(seed);
+ Log.info("Running with seed: " + seed);
+
+ // setting completely loose quotas
+ long timeWindow = 1000000L;
+ Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
+ step = 1000L;
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ String reservationQ = testUtil.getFullReservationQueueName();
+
+ float instConstraint = 100;
+ float avgConstraint = 100;
+
+ ReservationSchedulerConfiguration conf =
+ ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+ instConstraint, avgConstraint);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, conf);
+ agent = new GreedyReservationAgent();
+
+ QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+ plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", null, true);
+ }
+
+ @SuppressWarnings("javadoc")
+ @Test
+ public void testSimple() throws PlanningException {
+
+ prepareBasicPlan();
+
+ // create a request with a single atomic ask
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(5 * step);
+ rr.setDeadline(20 * step);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 5, 10 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ for (long i = 10 * step; i < 20 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 10, 2 * 10)));
+ }
+
+ }
+
+ @Test
+ public void testOrder() throws PlanningException {
+ prepareBasicPlan();
+
+ // create a completely utilized segment around time 30
+ int[] f = { 100, 100 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 30 * step, 30 * step + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+ res, minAlloc)));
+
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0 * step);
+ rr.setDeadline(70 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20 * step);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 4);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+
+ System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testOrderNoGapImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create a completely utilized segment at time 30
+ int[] f = { 100, 100 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 30 * step, 30 * step + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+ res, minAlloc)));
+
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0L);
+
+ rr.setDeadline(70L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+
+ // validate
+ assertFalse("Agent-based allocation should have failed", result);
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 3);
+
+ System.out
+ .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testOrderNoGap() throws PlanningException {
+ prepareBasicPlan();
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0 * step);
+ rr.setDeadline(60 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20 * step);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ // validate
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
+
+ }
+
+ @Test
+ public void testSingleSliding() throws PlanningException {
+ prepareBasicPlan();
+
+ // create a single request for which we need subsequent (tight) packing.
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 200, 10, 10 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
+
+ System.out.println("--------AFTER packed ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAny() throws PlanningException {
+ prepareBasicPlan();
+ // create an ANY request, with an impossible step (last in list, first
+ // considered),
+ // and two satisfiable ones. We expect the second one to be returned.
+
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 5, 5, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 5, 10 * step);
+ ReservationRequest r3 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 110, 110, 10 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r3);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean res = agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", res);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+
+ System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAnyImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create an ANY request, with all impossible alternatives
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100L);
+ rr.setDeadline(120L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+
+ // longer than arrival-deadline
+ ReservationRequest r1 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 35, 5, 30);
+ // above max cluster size
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 110, 110, 10);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r1);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+ // validate results, we expect the second one to be accepted
+ assertFalse("Agent-based allocation should have failed", result);
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 2);
+
+ System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAll() throws PlanningException {
+ prepareBasicPlan();
+ // create an ALL request
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 5, 5, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 10, 20 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+
+ System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAllImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create an ALL request, with an impossible combination, it should be
+ // rejected, and allocation remain unchanged
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100L);
+ rr.setDeadline(120L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 55, 5, 10);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 55, 5, 20);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+
+ // validate results, we expect the second one to be accepted
+ assertFalse("Agent-based allocation failed", result);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 2);
+
+ System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ private void prepareBasicPlan() throws PlanningException {
+
+ // insert in the reservation a couple of controlled reservations, to create
+ // conditions for assignment that are non-empty
+
+ int[] f = { 10, 10, 20, 20, 20, 10, 10 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
+ .generateAllocation(0, step, f), res, minAlloc)));
+
+ int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
+ Map<ReservationInterval, Resource> alloc =
+ ReservationSystemTestUtil.generateAllocation(5000, step, f2);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
+
+ System.out.println("--------BEFORE AGENT----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+ }
+
+ private boolean check(ReservationAllocation cs, long start, long end,
+ int containers, int mem, int cores) {
+
+ boolean res = true;
+ for (long i = start; i < end; i++) {
+ res = res
+ && Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(mem * containers, cores * containers));
+ }
+ return res;
+ }
+
+ public void testStress(int numJobs) throws PlanningException, IOException {
+
+ long timeWindow = 1000000L;
+ Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
+ step = 1000L;
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
+ String reservationQ = testUtil.getFullReservationQueueName();
+ float instConstraint = 100;
+ float avgConstraint = 100;
+ ReservationSchedulerConfiguration conf =
+ ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+ instConstraint, avgConstraint);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, conf);
+
+ plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
+ clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
+
+ int acc = 0;
+ List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+ for (long i = 0; i < numJobs; i++) {
+ list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < numJobs; i++) {
+
+ try {
+ if (agent.createReservation(
+ ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
+ plan, list.get(i))) {
+ acc++;
+ }
+ } catch (PlanningException p) {
+ // ignore exceptions
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+ + " in " + (end - start) + "ms");
+ }
+
+ public static void main(String[] arg) {
+
+ // run a stress test with by default 1000 random jobs
+ int numJobs = 1000;
+ if (arg.length > 0) {
+ numJobs = Integer.parseInt(arg[0]);
+ }
+
+ try {
+ TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+ test.setup();
+ test.testStress(numJobs);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
new file mode 100644
index 0000000..aeb1e6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
@@ -0,0 +1,170 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Test;
+
+public class TestSimpleCapacityReplanner {
+
+ @Test
+ public void testReplanningPlanCapacityLoss() throws PlanningException {
+
+ Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+ ResourceCalculator res = new DefaultResourceCalculator();
+ long step = 1L;
+ Clock clock = mock(Clock.class);
+ ReservationAgent agent = mock(ReservationAgent.class);
+
+ SharingPolicy policy = new NoOverCommitPolicy();
+ policy.init("root.dedicated", null);
+
+ QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+ when(clock.getTime()).thenReturn(0L);
+ SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
+
+ ReservationSchedulerConfiguration conf =
+ mock(ReservationSchedulerConfiguration.class);
+ when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
+
+ enf.init("blah", conf);
+
+ // Initialize the plan with more resources
+ InMemoryPlan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
+
+ // add reservation filling the plan (separating them 1ms, so we are sure
+ // s2 follows s1 on acceptance
+ long ts = System.currentTimeMillis();
+ ReservationId r1 = ReservationId.newInstance(ts, 1);
+ int[] f5 = { 20, 20, 20, 20, 20 };
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(1L);
+ ReservationId r2 = ReservationId.newInstance(ts, 2);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(2L);
+ ReservationId r3 = ReservationId.newInstance(ts, 3);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(3L);
+ ReservationId r4 = ReservationId.newInstance(ts, 4);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(4L);
+ ReservationId r5 = ReservationId.newInstance(ts, 5);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+
+ int[] f6 = { 50, 50, 50, 50, 50 };
+ ReservationId r6 = ReservationId.newInstance(ts, 6);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
+ "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(6L);
+ ReservationId r7 = ReservationId.newInstance(ts, 7);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
+ "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+ minAlloc)));
+
+ // remove some of the resources (requires replanning)
+ plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
+
+ when(clock.getTime()).thenReturn(0L);
+
+ // run the replanner
+ enf.plan(plan, null);
+
+ // check which reservation are still present
+ assertNotNull(plan.getReservationById(r1));
+ assertNotNull(plan.getReservationById(r2));
+ assertNotNull(plan.getReservationById(r3));
+ assertNotNull(plan.getReservationById(r6));
+ assertNotNull(plan.getReservationById(r7));
+
+ // and which ones are removed
+ assertNull(plan.getReservationById(r4));
+ assertNull(plan.getReservationById(r5));
+
+ // check resources at each moment in time no more exceed capacity
+ for (int i = 0; i < 20; i++) {
+ int tot = 0;
+ for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
+ tot = r.getResourcesAtTime(i).getMemory();
+ }
+ assertTrue(tot <= 70 * 1024);
+ }
+ }
+
+ private Map<ReservationInterval, Resource> generateAllocation(
+ int startTime, int[] alloc) {
+ Map<ReservationInterval, Resource> req =
+ new TreeMap<ReservationInterval, Resource>();
+ for (int i = 0; i < alloc.length; i++) {
+ req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+ ReservationSystemUtil.toResource(
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ alloc[i])));
+ }
+ return req;
+ }
+
+}
[2/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)
Posted by cu...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
deleted file mode 100644
index de94dcd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
+++ /dev/null
@@ -1,604 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.ReservationRequests;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Before;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-public class TestGreedyReservationAgent {
-
- ReservationAgent agent;
- InMemoryPlan plan;
- Resource minAlloc = Resource.newInstance(1024, 1);
- ResourceCalculator res = new DefaultResourceCalculator();
- Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
- Random rand = new Random();
- long step;
-
- @Before
- public void setup() throws Exception {
-
- long seed = rand.nextLong();
- rand.setSeed(seed);
- Log.info("Running with seed: " + seed);
-
- // setting completely loose quotas
- long timeWindow = 1000000L;
- Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
- step = 1000L;
- ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
- String reservationQ = testUtil.getFullReservationQueueName();
-
- float instConstraint = 100;
- float avgConstraint = 100;
-
- ReservationSchedulerConfiguration conf =
- ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
- instConstraint, avgConstraint);
- CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
- policy.init(reservationQ, conf);
- agent = new GreedyReservationAgent();
-
- QueueMetrics queueMetrics = mock(QueueMetrics.class);
-
- plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
- res, minAlloc, maxAlloc, "dedicated", null, true);
- }
-
- @SuppressWarnings("javadoc")
- @Test
- public void testSimple() throws PlanningException {
-
- prepareBasicPlan();
-
- // create a request with a single atomic ask
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(5 * step);
- rr.setDeadline(20 * step);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 10, 5, 10 * step);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setReservationResources(Collections.singletonList(r));
- rr.setReservationRequests(reqs);
-
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- agent.createReservation(reservationID, "u1", plan, rr);
-
- assertTrue("Agent-based allocation failed", reservationID != null);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 3);
-
- ReservationAllocation cs = plan.getReservationById(reservationID);
-
- System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- for (long i = 10 * step; i < 20 * step; i++) {
- assertTrue(
- "Agent-based allocation unexpected",
- Resources.equals(cs.getResourcesAtTime(i),
- Resource.newInstance(2048 * 10, 2 * 10)));
- }
-
- }
-
- @Test
- public void testOrder() throws PlanningException {
- prepareBasicPlan();
-
- // create a completely utilized segment around time 30
- int[] f = { 100, 100 };
-
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), null, "u1",
- "dedicated", 30 * step, 30 * step + f.length * step,
- ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
- res, minAlloc)));
-
- // create a chain of 4 RR, mixing gang and non-gang
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(0 * step);
- rr.setDeadline(70 * step);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 10, 1, 10 * step);
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 10, 10, 20 * step);
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- list.add(r2);
- list.add(r);
- list.add(r2);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- // submit to agent
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- agent.createReservation(reservationID, "u1", plan, rr);
-
- // validate
- assertTrue("Agent-based allocation failed", reservationID != null);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 4);
-
- ReservationAllocation cs = plan.getReservationById(reservationID);
-
- assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
- assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
-
- System.out.println("--------AFTER ORDER ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- @Test
- public void testOrderNoGapImpossible() throws PlanningException {
- prepareBasicPlan();
- // create a completely utilized segment at time 30
- int[] f = { 100, 100 };
-
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), null, "u1",
- "dedicated", 30 * step, 30 * step + f.length * step,
- ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
- res, minAlloc)));
-
- // create a chain of 4 RR, mixing gang and non-gang
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(0L);
-
- rr.setDeadline(70L);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 10, 1, 10);
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 10, 10, 20);
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- list.add(r2);
- list.add(r);
- list.add(r2);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- boolean result = false;
- try {
- // submit to agent
- result = agent.createReservation(reservationID, "u1", plan, rr);
- fail();
- } catch (PlanningException p) {
- // expected
- }
-
- // validate
- assertFalse("Agent-based allocation should have failed", result);
- assertTrue("Agent-based allocation should have failed", plan
- .getAllReservations().size() == 3);
-
- System.out
- .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- @Test
- public void testOrderNoGap() throws PlanningException {
- prepareBasicPlan();
- // create a chain of 4 RR, mixing gang and non-gang
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(0 * step);
- rr.setDeadline(60 * step);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 10, 1, 10 * step);
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 10, 10, 20 * step);
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- list.add(r2);
- list.add(r);
- list.add(r2);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
- rr.setReservationRequests(reqs);
-
- // submit to agent
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- agent.createReservation(reservationID, "u1", plan, rr);
-
- System.out.println("--------AFTER ORDER ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- // validate
- assertTrue("Agent-based allocation failed", reservationID != null);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 3);
-
- ReservationAllocation cs = plan.getReservationById(reservationID);
-
- assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
- assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
-
- }
-
- @Test
- public void testSingleSliding() throws PlanningException {
- prepareBasicPlan();
-
- // create a single request for which we need subsequent (tight) packing.
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(100 * step);
- rr.setDeadline(120 * step);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 200, 10, 10 * step);
-
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- // submit to agent
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- agent.createReservation(reservationID, "u1", plan, rr);
-
- // validate results, we expect the second one to be accepted
- assertTrue("Agent-based allocation failed", reservationID != null);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 3);
-
- ReservationAllocation cs = plan.getReservationById(reservationID);
-
- assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
-
- System.out.println("--------AFTER packed ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- @Test
- public void testAny() throws PlanningException {
- prepareBasicPlan();
- // create an ANY request, with an impossible step (last in list, first
- // considered),
- // and two satisfiable ones. We expect the second one to be returned.
-
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(100 * step);
- rr.setDeadline(120 * step);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 5, 5, 10 * step);
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 10, 5, 10 * step);
- ReservationRequest r3 = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 110, 110, 10 * step);
-
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- list.add(r2);
- list.add(r3);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- // submit to agent
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- boolean res = agent.createReservation(reservationID, "u1", plan, rr);
-
- // validate results, we expect the second one to be accepted
- assertTrue("Agent-based allocation failed", res);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 3);
-
- ReservationAllocation cs = plan.getReservationById(reservationID);
-
- assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
-
- System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
- + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- @Test
- public void testAnyImpossible() throws PlanningException {
- prepareBasicPlan();
- // create an ANY request, with all impossible alternatives
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(100L);
- rr.setDeadline(120L);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
-
- // longer than arrival-deadline
- ReservationRequest r1 = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 35, 5, 30);
- // above max cluster size
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 110, 110, 10);
-
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r1);
- list.add(r2);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- boolean result = false;
- try {
- // submit to agent
- result = agent.createReservation(reservationID, "u1", plan, rr);
- fail();
- } catch (PlanningException p) {
- // expected
- }
- // validate results, we expect the second one to be accepted
- assertFalse("Agent-based allocation should have failed", result);
- assertTrue("Agent-based allocation should have failed", plan
- .getAllReservations().size() == 2);
-
- System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- @Test
- public void testAll() throws PlanningException {
- prepareBasicPlan();
- // create an ALL request
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(100 * step);
- rr.setDeadline(120 * step);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 5, 5, 10 * step);
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 10, 10, 20 * step);
-
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- list.add(r2);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- // submit to agent
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- agent.createReservation(reservationID, "u1", plan, rr);
-
- // validate results, we expect the second one to be accepted
- assertTrue("Agent-based allocation failed", reservationID != null);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 3);
-
- ReservationAllocation cs = plan.getReservationById(reservationID);
-
- assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
-
- System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
- + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- @Test
- public void testAllImpossible() throws PlanningException {
- prepareBasicPlan();
- // create an ALL request, with an impossible combination, it should be
- // rejected, and allocation remain unchanged
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
- rr.setArrival(100L);
- rr.setDeadline(120L);
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
- ReservationRequest r = ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), 55, 5, 10);
- ReservationRequest r2 = ReservationRequest.newInstance(
- Resource.newInstance(2048, 2), 55, 5, 20);
-
- List<ReservationRequest> list = new ArrayList<ReservationRequest>();
- list.add(r);
- list.add(r2);
- reqs.setReservationResources(list);
- rr.setReservationRequests(reqs);
-
- ReservationId reservationID = ReservationSystemTestUtil
- .getNewReservationId();
- boolean result = false;
- try {
- // submit to agent
- result = agent.createReservation(reservationID, "u1", plan, rr);
- fail();
- } catch (PlanningException p) {
- // expected
- }
-
- // validate results, we expect the second one to be accepted
- assertFalse("Agent-based allocation failed", result);
- assertTrue("Agent-based allocation failed", plan.getAllReservations()
- .size() == 2);
-
- System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
- + reservationID + ")----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
-
- }
-
- private void prepareBasicPlan() throws PlanningException {
-
- // insert in the reservation a couple of controlled reservations, to create
- // conditions for assignment that are non-empty
-
- int[] f = { 10, 10, 20, 20, 20, 10, 10 };
-
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), null, "u1",
- "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
- .generateAllocation(0, step, f), res, minAlloc)));
-
- int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
- Map<ReservationInterval, Resource> alloc =
- ReservationSystemTestUtil.generateAllocation(5000, step, f2);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(
- ReservationSystemTestUtil.getNewReservationId(), null, "u1",
- "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
-
- System.out.println("--------BEFORE AGENT----------");
- System.out.println(plan.toString());
- System.out.println(plan.toCumulativeString());
- }
-
- private boolean check(ReservationAllocation cs, long start, long end,
- int containers, int mem, int cores) {
-
- boolean res = true;
- for (long i = start; i < end; i++) {
- res = res
- && Resources.equals(cs.getResourcesAtTime(i),
- Resource.newInstance(mem * containers, cores * containers));
- }
- return res;
- }
-
- public void testStress(int numJobs) throws PlanningException, IOException {
-
- long timeWindow = 1000000L;
- Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
- step = 1000L;
- ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
- CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
- String reservationQ = testUtil.getFullReservationQueueName();
- float instConstraint = 100;
- float avgConstraint = 100;
- ReservationSchedulerConfiguration conf =
- ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
- instConstraint, avgConstraint);
- CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
- policy.init(reservationQ, conf);
-
- plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
- clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
-
- int acc = 0;
- List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
- for (long i = 0; i < numJobs; i++) {
- list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
- }
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < numJobs; i++) {
-
- try {
- if (agent.createReservation(
- ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
- plan, list.get(i))) {
- acc++;
- }
- } catch (PlanningException p) {
- // ignore exceptions
- }
- }
-
- long end = System.currentTimeMillis();
- System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
- + " in " + (end - start) + "ms");
- }
-
- public static void main(String[] arg) {
-
- // run a stress test with by default 1000 random jobs
- int numJobs = 1000;
- if (arg.length > 0) {
- numJobs = Integer.parseInt(arg[0]);
- }
-
- try {
- TestGreedyReservationAgent test = new TestGreedyReservationAgent();
- test.setup();
- test.testStress(numJobs);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 722fb29..b6d24b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
index 1e15618..809892c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index d0f4dc6..f0cc49c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -164,6 +164,53 @@ public class TestRLESparseResourceAllocation {
Assert.assertTrue(rleSparseVector.isEmpty());
}
+ @Test
+ public void testToIntervalMap() {
+ ResourceCalculator resCalc = new DefaultResourceCalculator();
+ Resource minAlloc = Resource.newInstance(1, 1);
+ RLESparseResourceAllocation rleSparseVector =
+ new RLESparseResourceAllocation(resCalc, minAlloc);
+ Map<ReservationInterval, Resource> mapAllocations;
+
+ // Check empty
+ mapAllocations = rleSparseVector.toIntervalMap();
+ Assert.assertTrue(mapAllocations.isEmpty());
+
+ // Check full
+ int[] alloc = { 0, 5, 10, 10, 5, 0, 5, 0 };
+ int start = 100;
+ Set<Entry<ReservationInterval, Resource>> inputs =
+ generateAllocation(start, alloc, false).entrySet();
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
+ rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+ }
+ mapAllocations = rleSparseVector.toIntervalMap();
+ Assert.assertTrue(mapAllocations.size() == 5);
+ for (Entry<ReservationInterval, Resource> entry : mapAllocations
+ .entrySet()) {
+ ReservationInterval interval = entry.getKey();
+ Resource resource = entry.getValue();
+ if (interval.getStartTime() == 101L) {
+ Assert.assertTrue(interval.getEndTime() == 102L);
+ Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
+ } else if (interval.getStartTime() == 102L) {
+ Assert.assertTrue(interval.getEndTime() == 104L);
+ Assert.assertEquals(resource, Resource.newInstance(10 * 1024, 10));
+ } else if (interval.getStartTime() == 104L) {
+ Assert.assertTrue(interval.getEndTime() == 105L);
+ Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
+ } else if (interval.getStartTime() == 105L) {
+ Assert.assertTrue(interval.getEndTime() == 106L);
+ Assert.assertEquals(resource, Resource.newInstance(0 * 1024, 0));
+ } else if (interval.getStartTime() == 106L) {
+ Assert.assertTrue(interval.getEndTime() == 107L);
+ Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
+ } else {
+ Assert.fail();
+ }
+ }
+ }
+
private Map<ReservationInterval, Resource> generateAllocation(
int startTime, int[] alloc, boolean isStep) {
Map<ReservationInterval, Resource> req =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
index 50df8fe..f5625fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
deleted file mode 100644
index d4a97ba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Test;
-
-public class TestSimpleCapacityReplanner {
-
- @Test
- public void testReplanningPlanCapacityLoss() throws PlanningException {
-
- Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
- Resource minAlloc = Resource.newInstance(1024, 1);
- Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
-
- ResourceCalculator res = new DefaultResourceCalculator();
- long step = 1L;
- Clock clock = mock(Clock.class);
- ReservationAgent agent = mock(ReservationAgent.class);
-
- SharingPolicy policy = new NoOverCommitPolicy();
- policy.init("root.dedicated", null);
-
- QueueMetrics queueMetrics = mock(QueueMetrics.class);
-
- when(clock.getTime()).thenReturn(0L);
- SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
-
- ReservationSchedulerConfiguration conf =
- mock(ReservationSchedulerConfiguration.class);
- when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
-
- enf.init("blah", conf);
-
- // Initialize the plan with more resources
- InMemoryPlan plan =
- new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
- res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
-
- // add reservation filling the plan (separating them 1ms, so we are sure
- // s2 follows s1 on acceptance
- long ts = System.currentTimeMillis();
- ReservationId r1 = ReservationId.newInstance(ts, 1);
- int[] f5 = { 20, 20, 20, 20, 20 };
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
- "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
- minAlloc)));
- when(clock.getTime()).thenReturn(1L);
- ReservationId r2 = ReservationId.newInstance(ts, 2);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
- "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
- minAlloc)));
- when(clock.getTime()).thenReturn(2L);
- ReservationId r3 = ReservationId.newInstance(ts, 3);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
- "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
- minAlloc)));
- when(clock.getTime()).thenReturn(3L);
- ReservationId r4 = ReservationId.newInstance(ts, 4);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
- "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
- minAlloc)));
- when(clock.getTime()).thenReturn(4L);
- ReservationId r5 = ReservationId.newInstance(ts, 5);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
- "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
- minAlloc)));
-
- int[] f6 = { 50, 50, 50, 50, 50 };
- ReservationId r6 = ReservationId.newInstance(ts, 6);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
- "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
- minAlloc)));
- when(clock.getTime()).thenReturn(6L);
- ReservationId r7 = ReservationId.newInstance(ts, 7);
- assertTrue(plan.toString(),
- plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
- "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
- minAlloc)));
-
- // remove some of the resources (requires replanning)
- plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
-
- when(clock.getTime()).thenReturn(0L);
-
- // run the replanner
- enf.plan(plan, null);
-
- // check which reservation are still present
- assertNotNull(plan.getReservationById(r1));
- assertNotNull(plan.getReservationById(r2));
- assertNotNull(plan.getReservationById(r3));
- assertNotNull(plan.getReservationById(r6));
- assertNotNull(plan.getReservationById(r7));
-
- // and which ones are removed
- assertNull(plan.getReservationById(r4));
- assertNull(plan.getReservationById(r5));
-
- // check resources at each moment in time no more exceed capacity
- for (int i = 0; i < 20; i++) {
- int tot = 0;
- for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
- tot = r.getResourcesAtTime(i).getMemory();
- }
- assertTrue(tot <= 70 * 1024);
- }
- }
-
- private Map<ReservationInterval, Resource> generateAllocation(
- int startTime, int[] alloc) {
- Map<ReservationInterval, Resource> req =
- new TreeMap<ReservationInterval, Resource>();
- for (int i = 0; i < alloc.length; i++) {
- req.put(new ReservationInterval(startTime + i, startTime + i + 1),
- ReservationSystemUtil.toResource(
- ReservationRequest.newInstance(Resource.newInstance(1024, 1),
- alloc[i])));
- }
- return req;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
new file mode 100644
index 0000000..9a1621a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestAlignedPlanner {
+
+ ReservationAgent agent;
+ InMemoryPlan plan;
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ ResourceCalculator res = new DefaultResourceCalculator();
+ Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+ Random rand = new Random();
+ long step;
+
+ @Test
+ public void testSingleReservationAccept() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario1();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 5 * step, // Job arrival time
+ 20 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), // Capability
+ 10, // Num containers
+ 5, // Concurrency
+ 10 * step) }, // Duration
+ ReservationRequestInterpreter.R_ORDER, "u1");
+
+ // Add reservation
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+
+ // CHECK: allocation was accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == numJobsInScenario + 1);
+
+ // Get reservation
+ ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+ // Verify allocation
+ assertTrue(alloc1.toString(),
+ check(alloc1, 10 * step, 20 * step, 10, 2048, 2));
+
+ }
+
+ @Test
+ public void testOrderNoGapImpossible() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10L, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1");
+
+ // Add reservation
+ try {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+ fail();
+ } catch (PlanningException e) {
+ // Expected failure
+ }
+
+ // CHECK: allocation was not accepted
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == numJobsInScenario);
+
+ }
+
+ @Test
+ public void testOrderNoGapImpossible2() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 13 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 10, // Num containers
+ 10, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1");
+
+ // Add reservation
+ try {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+ fail();
+ } catch (PlanningException e) {
+ // Expected failure
+ }
+
+ // CHECK: allocation was not accepted
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == numJobsInScenario);
+
+ }
+
+ @Test
+ public void testOrderImpossible() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ 2 * step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ORDER, "u1");
+
+ // Add reservation
+ try {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+ fail();
+ } catch (PlanningException e) {
+ // Expected failure
+ }
+
+ // CHECK: allocation was not accepted
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == numJobsInScenario);
+
+ }
+
+ @Test
+ public void testAnyImpossible() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ 3 * step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ 2 * step) }, // Duration
+ ReservationRequestInterpreter.R_ANY, "u1");
+
+ // Add reservation
+ try {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+ fail();
+ } catch (PlanningException e) {
+ // Expected failure
+ }
+
+ // CHECK: allocation was not accepted
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == numJobsInScenario);
+
+ }
+
+ @Test
+ public void testAnyAccept() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ 2 * step) }, // Duration
+ ReservationRequestInterpreter.R_ANY, "u1");
+
+ // Add reservation
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+
+ // CHECK: allocation was accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == numJobsInScenario + 1);
+
+ // Get reservation
+ ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+ // Verify allocation
+ assertTrue(alloc1.toString(),
+ check(alloc1, 14 * step, 15 * step, 20, 1024, 1));
+
+ }
+
+ @Test
+ public void testAllAccept() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Add reservation
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+
+ // CHECK: allocation was accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == numJobsInScenario + 1);
+
+ // Get reservation
+ ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+ // Verify allocation
+ assertTrue(alloc1.toString(),
+ check(alloc1, 10 * step, 11 * step, 20, 1024, 1));
+ assertTrue(alloc1.toString(),
+ check(alloc1, 14 * step, 15 * step, 20, 1024, 1));
+
+ }
+
+ @Test
+ public void testAllImpossible() throws PlanningException {
+
+ // Prepare basic plan
+ int numJobsInScenario = initializeScenario2();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] {
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ step), // Duration
+ ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ 2 * step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Add reservation
+ try {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+ fail();
+ } catch (PlanningException e) {
+ // Expected failure
+ }
+
+ // CHECK: allocation was not accepted
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == numJobsInScenario);
+
+ }
+
+ @Test
+ public void testUpdate() throws PlanningException {
+
+ // Create flexible reservation
+ ReservationDefinition rrFlex =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 14 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 100, // Num containers
+ 1, // Concurrency
+ 2 * step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Create blocking reservation
+ ReservationDefinition rrBlock =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 11 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 100, // Num containers
+ 100, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Create reservation IDs
+ ReservationId flexReservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ ReservationId blockReservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+
+ // Add block, add flex, remove block, update flex
+ agent.createReservation(blockReservationID, "uBlock", plan, rrBlock);
+ agent.createReservation(flexReservationID, "uFlex", plan, rrFlex);
+ agent.deleteReservation(blockReservationID, "uBlock", plan);
+ agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex);
+
+ // CHECK: allocation was accepted
+ assertTrue("Agent-based allocation failed", flexReservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 1);
+
+ // Get reservation
+ ReservationAllocation alloc1 = plan.getReservationById(flexReservationID);
+
+ // Verify allocation
+ assertTrue(alloc1.toString(),
+ check(alloc1, 10 * step, 14 * step, 50, 1024, 1));
+
+ }
+
+ @Test
+ public void testImpossibleDuration() throws PlanningException {
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 15 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 20, // Num containers
+ 20, // Concurrency
+ 10 * step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Add reservation
+ try {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+ fail();
+ } catch (PlanningException e) {
+ // Expected failure
+ }
+
+ // CHECK: allocation was not accepted
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 0);
+
+ }
+
+ @Test
+ public void testLoadedDurationIntervals() throws PlanningException {
+
+ int numJobsInScenario = initializeScenario3();
+
+ // Create reservation
+ ReservationDefinition rr1 =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 13 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 80, // Num containers
+ 10, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Add reservation
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr1);
+
+ // CHECK: allocation was accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == numJobsInScenario + 1);
+
+ // Get reservation
+ ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+ // Verify allocation
+ assertTrue(alloc1.toString(),
+ check(alloc1, 10 * step, 11 * step, 20, 1024, 1));
+ assertTrue(alloc1.toString(),
+ check(alloc1, 11 * step, 12 * step, 20, 1024, 1));
+ assertTrue(alloc1.toString(),
+ check(alloc1, 12 * step, 13 * step, 40, 1024, 1));
+ }
+
+ @Test
+ public void testCostFunction() throws PlanningException {
+
+ // Create large memory reservation
+ ReservationDefinition rr7Mem1Core =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 11 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(7 * 1024, 1),// Capability
+ 1, // Num containers
+ 1, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1");
+
+ // Create reservation
+ ReservationDefinition rr6Mem6Cores =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 11 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(6 * 1024, 6),// Capability
+ 1, // Num containers
+ 1, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u2");
+
+ // Create reservation
+ ReservationDefinition rr =
+ createReservationDefinition(
+ 10 * step, // Job arrival time
+ 12 * step, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 1, // Num containers
+ 1, // Concurrency
+ step) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u3");
+
+ // Create reservation IDs
+ ReservationId reservationID1 =
+ ReservationSystemTestUtil.getNewReservationId();
+ ReservationId reservationID2 =
+ ReservationSystemTestUtil.getNewReservationId();
+ ReservationId reservationID3 =
+ ReservationSystemTestUtil.getNewReservationId();
+
+ // Add all
+ agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core);
+ agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores);
+ agent.createReservation(reservationID3, "u3", plan, rr);
+
+ // Get reservation
+ ReservationAllocation alloc3 = plan.getReservationById(reservationID3);
+
+ assertTrue(alloc3.toString(),
+ check(alloc3, 10 * step, 11 * step, 0, 1024, 1));
+ assertTrue(alloc3.toString(),
+ check(alloc3, 11 * step, 12 * step, 1, 1024, 1));
+
+ }
+
+ @Test
+ public void testFromCluster() throws PlanningException {
+
+ // int numJobsInScenario = initializeScenario3();
+
+ List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+
+ // Create reservation
+ list.add(createReservationDefinition(
+ 1425716392178L, // Job arrival time
+ 1425722262791L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 7, // Num containers
+ 1, // Concurrency
+ 587000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u1"));
+
+ list.add(createReservationDefinition(
+ 1425716406178L, // Job arrival time
+ 1425721255841L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 6, // Num containers
+ 1, // Concurrency
+ 485000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u2"));
+
+ list.add(createReservationDefinition(
+ 1425716399178L, // Job arrival time
+ 1425723780138L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 6, // Num containers
+ 1, // Concurrency
+ 738000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u3"));
+
+ list.add(createReservationDefinition(
+ 1425716437178L, // Job arrival time
+ 1425722968378L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 7, // Num containers
+ 1, // Concurrency
+ 653000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u4"));
+
+ list.add(createReservationDefinition(
+ 1425716406178L, // Job arrival time
+ 1425721926090L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 6, // Num containers
+ 1, // Concurrency
+ 552000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u5"));
+
+ list.add(createReservationDefinition(
+ 1425716379178L, // Job arrival time
+ 1425722238553L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 6, // Num containers
+ 1, // Concurrency
+ 586000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u6"));
+
+ list.add(createReservationDefinition(
+ 1425716407178L, // Job arrival time
+ 1425722908317L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 7, // Num containers
+ 1, // Concurrency
+ 650000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u7"));
+
+ list.add(createReservationDefinition(
+ 1425716452178L, // Job arrival time
+ 1425722841562L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 6, // Num containers
+ 1, // Concurrency
+ 639000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u8"));
+
+ list.add(createReservationDefinition(
+ 1425716384178L, // Job arrival time
+ 1425721766129L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 7, // Num containers
+ 1, // Concurrency
+ 538000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u9"));
+
+ list.add(createReservationDefinition(
+ 1425716437178L, // Job arrival time
+ 1425722507886L, // Job deadline
+ new ReservationRequest[] { ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), // Capability
+ 5, // Num containers
+ 1, // Concurrency
+ 607000) }, // Duration
+ ReservationRequestInterpreter.R_ALL, "u10"));
+
+ // Add reservation
+ int i = 1;
+ for (ReservationDefinition rr : list) {
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u" + Integer.toString(i), plan,
+ rr);
+ ++i;
+ }
+
+ // CHECK: allocation was accepted
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == list.size());
+
+ }
+
+ @Before
+ public void setup() throws Exception {
+
+ // Initialize random seed
+ long seed = rand.nextLong();
+ rand.setSeed(seed);
+ Log.info("Running with seed: " + seed);
+
+ // Set cluster parameters
+ long timeWindow = 1000000L;
+ int capacityMem = 100 * 1024;
+ int capacityCores = 100;
+ step = 60000L;
+
+ Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
+
+ // Set configuration
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ String reservationQ = testUtil.getFullReservationQueueName();
+ float instConstraint = 100;
+ float avgConstraint = 100;
+
+ ReservationSchedulerConfiguration conf =
+ ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+ instConstraint, avgConstraint);
+
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, conf);
+
+ QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+ // Set planning agent
+ agent = new AlignedPlannerWithGreedy();
+
+ // Create Plan
+ plan =
+ new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", null, true);
+ }
+
+ private int initializeScenario1() throws PlanningException {
+
+ // insert in the reservation a couple of controlled reservations, to create
+ // conditions for assignment that are non-empty
+
+ addFixedAllocation(0L, step, new int[] { 10, 10, 20, 20, 20, 10, 10 });
+
+ System.out.println("--------BEFORE AGENT----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ return 1;
+
+ }
+
+ private int initializeScenario2() throws PlanningException {
+
+ // insert in the reservation a couple of controlled reservations, to create
+ // conditions for assignment that are non-empty
+
+ addFixedAllocation(11 * step, step, new int[] { 90, 90, 90 });
+
+ System.out.println("--------BEFORE AGENT----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ return 1;
+
+ }
+
+ private int initializeScenario3() throws PlanningException {
+
+ // insert in the reservation a couple of controlled reservations, to create
+ // conditions for assignment that are non-empty
+
+ addFixedAllocation(10 * step, step, new int[] { 70, 80, 60 });
+
+ System.out.println("--------BEFORE AGENT----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ return 1;
+
+ }
+
+ private void addFixedAllocation(long start, long step, int[] f)
+ throws PlanningException {
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null,
+ "user_fixed", "dedicated", start, start + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(start, step, f), res,
+ minAlloc)));
+
+ }
+
+ private ReservationDefinition createReservationDefinition(long arrival,
+ long deadline, ReservationRequest[] reservationRequests,
+ ReservationRequestInterpreter rType, String username) {
+
+ return ReservationDefinition.newInstance(arrival, deadline,
+ ReservationRequests.newInstance(Arrays.asList(reservationRequests),
+ rType), username);
+
+ }
+
+ private boolean check(ReservationAllocation alloc, long start, long end,
+ int containers, int mem, int cores) {
+
+ Resource expectedResources =
+ Resource.newInstance(mem * containers, cores * containers);
+
+ // Verify that all allocations in [start,end) equal containers * (mem,cores)
+ for (long i = start; i < end; i++) {
+ if (!Resources.equals(alloc.getResourcesAtTime(i), expectedResources)) {
+ return false;
+ }
+ }
+ return true;
+
+ }
+
+}
[4/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)
Posted by cu...@apache.org.
YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/26ea0458
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/26ea0458
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/26ea0458
Branch: refs/heads/branch-2
Commit: 26ea0458143cf09c01b5763622852bcbf9fd15ca
Parents: 621203b
Author: ccurino <cc...@ubuntu.gateway.2wire.net>
Authored: Sat Jul 25 07:39:47 2015 -0700
Committer: ccurino <cc...@ubuntu.gateway.2wire.net>
Committed: Sat Jul 25 07:47:11 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reservation/AbstractReservationSystem.java | 2 +
.../reservation/GreedyReservationAgent.java | 390 ---------
.../reservation/InMemoryPlan.java | 13 +-
.../InMemoryReservationAllocation.java | 8 +-
.../resourcemanager/reservation/Plan.java | 1 +
.../reservation/PlanContext.java | 2 +
.../resourcemanager/reservation/PlanView.java | 31 +-
.../resourcemanager/reservation/Planner.java | 47 --
.../RLESparseResourceAllocation.java | 55 +-
.../reservation/ReservationAgent.java | 72 --
.../ReservationSchedulerConfiguration.java | 6 +-
.../reservation/ReservationSystem.java | 5 +-
.../reservation/ReservationSystemUtil.java | 6 +-
.../reservation/SimpleCapacityReplanner.java | 113 ---
.../planning/AlignedPlannerWithGreedy.java | 123 +++
.../planning/GreedyReservationAgent.java | 97 +++
.../reservation/planning/IterativePlanner.java | 338 ++++++++
.../reservation/planning/Planner.java | 49 ++
.../reservation/planning/PlanningAlgorithm.java | 207 +++++
.../reservation/planning/ReservationAgent.java | 73 ++
.../planning/SimpleCapacityReplanner.java | 118 +++
.../reservation/planning/StageAllocator.java | 55 ++
.../planning/StageAllocatorGreedy.java | 152 ++++
.../planning/StageAllocatorLowCostAligned.java | 360 ++++++++
.../planning/StageEarliestStart.java | 46 ++
.../planning/StageEarliestStartByDemand.java | 106 +++
.../StageEarliestStartByJobArrival.java | 39 +
.../planning/TryManyReservationAgents.java | 114 +++
.../reservation/ReservationSystemTestUtil.java | 5 +-
.../reservation/TestCapacityOverTimePolicy.java | 2 +-
.../TestCapacitySchedulerPlanFollower.java | 1 +
.../reservation/TestFairReservationSystem.java | 1 -
.../TestFairSchedulerPlanFollower.java | 1 +
.../reservation/TestGreedyReservationAgent.java | 604 --------------
.../reservation/TestInMemoryPlan.java | 2 +
.../reservation/TestNoOverCommitPolicy.java | 1 +
.../TestRLESparseResourceAllocation.java | 51 +-
.../TestSchedulerPlanFollowerBase.java | 1 +
.../TestSimpleCapacityReplanner.java | 162 ----
.../planning/TestAlignedPlanner.java | 820 +++++++++++++++++++
.../planning/TestGreedyReservationAgent.java | 611 ++++++++++++++
.../planning/TestSimpleCapacityReplanner.java | 170 ++++
43 files changed, 3634 insertions(+), 1429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b803b89..c3f2015 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -92,6 +92,9 @@ Release 2.8.0 - UNRELEASED
YARN-2019. Retrospect on decision of making RM crashed if any exception throw
in ZKRMStateStore. (Jian He via junping_du)
+ YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations.
+ (Jonathan Yaniv and Ishai Menache via curino)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 8a15ac6..d2603c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
deleted file mode 100644
index 214df1c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This Agent employs a simple greedy placement strategy, placing the various
- * stages of a {@link ReservationRequest} from the deadline moving backward
- * towards the arrival. This allows jobs with earlier deadline to be scheduled
- * greedily as well. Combined with an opportunistic anticipation of work if the
- * cluster is not fully utilized also seems to provide good latency for
- * best-effort jobs (i.e., jobs running without a reservation).
- *
- * This agent does not account for locality and only consider container
- * granularity for validation purposes (i.e., you can't exceed max-container
- * size).
- */
-public class GreedyReservationAgent implements ReservationAgent {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(GreedyReservationAgent.class);
-
- @Override
- public boolean createReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException {
- return computeAllocation(reservationId, user, plan, contract, null);
- }
-
- @Override
- public boolean updateReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException {
- return computeAllocation(reservationId, user, plan, contract,
- plan.getReservationById(reservationId));
- }
-
- @Override
- public boolean deleteReservation(ReservationId reservationId, String user,
- Plan plan) throws PlanningException {
- return plan.deleteReservation(reservationId);
- }
-
- private boolean computeAllocation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract,
- ReservationAllocation oldReservation) throws PlanningException,
- ContractValidationException {
- LOG.info("placing the following ReservationRequest: " + contract);
-
- Resource totalCapacity = plan.getTotalCapacity();
-
- // Here we can addd logic to adjust the ResourceDefinition to account for
- // system "imperfections" (e.g., scheduling delays for large containers).
-
- // Align with plan step conservatively (i.e., ceil arrival, and floor
- // deadline)
- long earliestStart = contract.getArrival();
- long step = plan.getStep();
- if (earliestStart % step != 0) {
- earliestStart = earliestStart + (step - (earliestStart % step));
- }
- long deadline =
- contract.getDeadline() - contract.getDeadline() % plan.getStep();
-
- // setup temporary variables to handle time-relations between stages and
- // intermediate answers
- long curDeadline = deadline;
- long oldDeadline = -1;
-
- Map<ReservationInterval, Resource> allocations =
- new HashMap<ReservationInterval, Resource>();
- RLESparseResourceAllocation tempAssigned =
- new RLESparseResourceAllocation(plan.getResourceCalculator(),
- plan.getMinimumAllocation());
-
- List<ReservationRequest> stages = contract.getReservationRequests()
- .getReservationResources();
- ReservationRequestInterpreter type = contract.getReservationRequests()
- .getInterpreter();
-
- boolean hasGang = false;
-
- // Iterate the stages in backward from deadline
- for (ListIterator<ReservationRequest> li =
- stages.listIterator(stages.size()); li.hasPrevious();) {
-
- ReservationRequest currentReservationStage = li.previous();
-
- // validate the RR respect basic constraints
- validateInput(plan, currentReservationStage, totalCapacity);
-
- hasGang |= currentReservationStage.getConcurrency() > 1;
-
- // run allocation for a single stage
- Map<ReservationInterval, Resource> curAlloc =
- placeSingleStage(plan, tempAssigned, currentReservationStage,
- earliestStart, curDeadline, oldReservation, totalCapacity);
-
- if (curAlloc == null) {
- // if we did not find an allocation for the currentReservationStage
- // return null, unless the ReservationDefinition we are placing is of
- // type ANY
- if (type != ReservationRequestInterpreter.R_ANY) {
- throw new PlanningException("The GreedyAgent"
- + " couldn't find a valid allocation for your request");
- } else {
- continue;
- }
- } else {
-
- // if we did find an allocation add it to the set of allocations
- allocations.putAll(curAlloc);
-
- // if this request is of type ANY we are done searching (greedy)
- // and can return the current allocation (break-out of the search)
- if (type == ReservationRequestInterpreter.R_ANY) {
- break;
- }
-
- // if the request is of ORDER or ORDER_NO_GAP we constraint the next
- // round of allocation to precede the current allocation, by setting
- // curDeadline
- if (type == ReservationRequestInterpreter.R_ORDER
- || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
- curDeadline = findEarliestTime(curAlloc.keySet());
-
- // for ORDER_NO_GAP verify that the allocation found so far has no
- // gap, return null otherwise (the greedy procedure failed to find a
- // no-gap
- // allocation)
- if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
- && oldDeadline > 0) {
- if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
- .getStep()) {
- throw new PlanningException("The GreedyAgent"
- + " couldn't find a valid allocation for your request");
- }
- }
- // keep the variable oldDeadline pointing to the last deadline we
- // found
- oldDeadline = curDeadline;
- }
- }
- }
-
- // / If we got here is because we failed to find an allocation for the
- // ReservationDefinition give-up and report failure to the user
- if (allocations.isEmpty()) {
- throw new PlanningException("The GreedyAgent"
- + " couldn't find a valid allocation for your request");
- }
-
- // create reservation with above allocations if not null/empty
-
- Resource ZERO_RES = Resource.newInstance(0, 0);
-
- long firstStartTime = findEarliestTime(allocations.keySet());
-
- // add zero-padding from arrival up to the first non-null allocation
- // to guarantee that the reservation exists starting at arrival
- if (firstStartTime > earliestStart) {
- allocations.put(new ReservationInterval(earliestStart,
- firstStartTime), ZERO_RES);
- firstStartTime = earliestStart;
- // consider to add trailing zeros at the end for simmetry
- }
-
- // Actually add/update the reservation in the plan.
- // This is subject to validation as other agents might be placing
- // in parallel and there might be sharing policies the agent is not
- // aware off.
- ReservationAllocation capReservation =
- new InMemoryReservationAllocation(reservationId, contract, user,
- plan.getQueueName(), firstStartTime,
- findLatestTime(allocations.keySet()), allocations,
- plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
- if (oldReservation != null) {
- return plan.updateReservation(capReservation);
- } else {
- return plan.addReservation(capReservation);
- }
- }
-
- private void validateInput(Plan plan, ReservationRequest rr,
- Resource totalCapacity) throws ContractValidationException {
-
- if (rr.getConcurrency() < 1) {
- throw new ContractValidationException("Gang Size should be >= 1");
- }
-
- if (rr.getNumContainers() <= 0) {
- throw new ContractValidationException("Num containers should be >= 0");
- }
-
- // check that gangSize and numContainers are compatible
- if (rr.getNumContainers() % rr.getConcurrency() != 0) {
- throw new ContractValidationException(
- "Parallelism must be an exact multiple of gang size");
- }
-
- // check that the largest container request does not exceed
- // the cluster-wide limit for container sizes
- if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
- rr.getCapability(), plan.getMaximumAllocation())) {
- throw new ContractValidationException("Individual"
- + " capability requests should not exceed cluster's maxAlloc");
- }
- }
-
- /**
- * This method actually perform the placement of an atomic stage of the
- * reservation. The key idea is to traverse the plan backward for a
- * "lease-duration" worth of time, and compute what is the maximum multiple of
- * our concurrency (gang) parameter we can fit. We do this and move towards
- * previous instant in time until the time-window is exhausted or we placed
- * all the user request.
- */
- private Map<ReservationInterval, Resource> placeSingleStage(
- Plan plan, RLESparseResourceAllocation tempAssigned,
- ReservationRequest rr, long earliestStart, long curDeadline,
- ReservationAllocation oldResAllocation, final Resource totalCapacity) {
-
- Map<ReservationInterval, Resource> allocationRequests =
- new HashMap<ReservationInterval, Resource>();
-
- // compute the gang as a resource and get the duration
- Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
- long dur = rr.getDuration();
- long step = plan.getStep();
-
- // ceil the duration to the next multiple of the plan step
- if (dur % step != 0) {
- dur += (step - (dur % step));
- }
-
- // we know for sure that this division has no remainder (part of contract
- // with user, validate before
- int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
-
- int maxGang = 0;
-
- // loop trying to place until we are done, or we are considering
- // an invalid range of times
- while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
-
- // as we run along we remember how many gangs we can fit, and what
- // was the most constraining moment in time (we will restart just
- // after that to place the next batch)
- maxGang = gangsToPlace;
- long minPoint = curDeadline;
- int curMaxGang = maxGang;
-
- // start placing at deadline (excluded due to [,) interval semantics and
- // move backward
- for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
- && maxGang > 0; t = t - plan.getStep()) {
-
- // As we run along we will logically remove the previous allocation for
- // this reservation
- // if one existed
- Resource oldResCap = Resource.newInstance(0, 0);
- if (oldResAllocation != null) {
- oldResCap = oldResAllocation.getResourcesAtTime(t);
- }
-
- // compute net available resources
- Resource netAvailableRes = Resources.clone(totalCapacity);
- Resources.addTo(netAvailableRes, oldResCap);
- Resources.subtractFrom(netAvailableRes,
- plan.getTotalCommittedResources(t));
- Resources.subtractFrom(netAvailableRes,
- tempAssigned.getCapacityAtTime(t));
-
- // compute maximum number of gangs we could fit
- curMaxGang =
- (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
- totalCapacity, netAvailableRes, gang));
-
- // pick the minimum between available resources in this instant, and how
- // many gangs we have to place
- curMaxGang = Math.min(gangsToPlace, curMaxGang);
-
- // compare with previous max, and set it. also remember *where* we found
- // the minimum (useful for next attempts)
- if (curMaxGang <= maxGang) {
- maxGang = curMaxGang;
- minPoint = t;
- }
- }
-
- // if we were able to place any gang, record this, and decrement
- // gangsToPlace
- if (maxGang > 0) {
- gangsToPlace -= maxGang;
-
- ReservationInterval reservationInt =
- new ReservationInterval(curDeadline - dur, curDeadline);
- ReservationRequest reservationRequest =
- ReservationRequest.newInstance(rr.getCapability(),
- rr.getConcurrency() * maxGang, rr.getConcurrency(),
- rr.getDuration());
- // remember occupied space (plan is read-only till we find a plausible
- // allocation for the entire request). This is needed since we might be
- // placing other ReservationRequest within the same
- // ReservationDefinition,
- // and we must avoid double-counting the available resources
- final Resource reservationRes = ReservationSystemUtil.toResource(
- reservationRequest);
- tempAssigned.addInterval(reservationInt, reservationRes);
- allocationRequests.put(reservationInt, reservationRes);
-
- }
-
- // reset our new starting point (curDeadline) to the most constraining
- // point so far, we will look "left" of that to find more places where
- // to schedule gangs (for sure nothing on the "right" of this point can
- // fit a full gang.
- curDeadline = minPoint;
- }
-
- // if no gangs are left to place we succeed and return the allocation
- if (gangsToPlace == 0) {
- return allocationRequests;
- } else {
- // If we are here is becasue we did not manage to satisfy this request.
- // So we need to remove unwanted side-effect from tempAssigned (needed
- // for ANY).
- for (Map.Entry<ReservationInterval, Resource> tempAllocation :
- allocationRequests.entrySet()) {
- tempAssigned.removeInterval(tempAllocation.getKey(),
- tempAllocation.getValue());
- }
- // and return null to signal failure in this allocation
- return null;
- }
- }
-
- // finds the leftmost point of this set of ReservationInterval
- private long findEarliestTime(Set<ReservationInterval> resInt) {
- long ret = Long.MAX_VALUE;
- for (ReservationInterval s : resInt) {
- if (s.getStartTime() < ret) {
- ret = s.getStartTime();
- }
- }
- return ret;
- }
-
- // finds the rightmost point of this set of ReservationIntervals
- private long findLatestTime(Set<ReservationInterval> resInt) {
- long ret = Long.MIN_VALUE;
- for (ReservationInterval s : resInt) {
- if (s.getEndTime() > ret) {
- ret = s.getEndTime();
- }
- }
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 50d66cf..abc9c98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
@@ -41,7 +43,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class InMemoryPlan implements Plan {
+/**
+ * This class represents an in memory representation of the state of our
+ * reservation system, and provides accelerated access to both individual
+ * reservations and aggregate utilization of resources over time.
+ */
+public class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
@@ -75,7 +82,7 @@ class InMemoryPlan implements Plan {
private Resource totalCapacity;
- InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry) {
@@ -83,7 +90,7 @@ class InMemoryPlan implements Plan {
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
}
- InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index a4dd23b..42a2243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
/**
* An in memory implementation of a reservation allocation using the
* {@link RLESparseResourceAllocation}
- *
+ *
*/
-class InMemoryReservationAllocation implements ReservationAllocation {
+public class InMemoryReservationAllocation implements ReservationAllocation {
private final String planName;
private final ReservationId reservationID;
@@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
private RLESparseResourceAllocation resourcesOverTime;
- InMemoryReservationAllocation(ReservationId reservationID,
+ public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, Resource> allocations,
@@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
allocations, calculator, minAlloc, false);
}
- InMemoryReservationAllocation(ReservationId reservationID,
+ public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, Resource> allocations,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
index e8e9e29..f7ffbd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* A Plan represents the central data structure of a reservation system that
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
index 6d3506d..94e299e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index b49e99e..be68906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -1,26 +1,27 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* This interface provides a read-only view on the allocations made in this
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
deleted file mode 100644
index 57f28ff..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-public interface Planner {
-
- /**
- * Update the existing {@link Plan}, by adding/removing/updating existing
- * reservations, and adding a subset of the reservation requests in the
- * contracts parameter.
- *
- * @param plan the {@link Plan} to replan
- * @param contracts the list of reservation requests
- * @throws PlanningException
- */
- public void plan(Plan plan, List<ReservationDefinition> contracts)
- throws PlanningException;
-
- /**
- * Initialize the replanner
- *
- * @param planQueueName the name of the queue for this plan
- * @param conf the scheduler configuration
- */
- void init(String planQueueName, ReservationSchedulerConfiguration conf);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 2957cc6..80f2ff7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -38,7 +38,7 @@ import com.google.gson.stream.JsonWriter;
/**
* This is a run length encoded sparse data structure that maintains resource
- * allocations over time
+ * allocations over time.
*/
public class RLESparseResourceAllocation {
@@ -74,7 +74,7 @@ public class RLESparseResourceAllocation {
/**
* Add a resource for the specified interval
- *
+ *
* @param reservationInterval the interval for which the resource is to be
* added
* @param totCap the resource to be added
@@ -138,7 +138,7 @@ public class RLESparseResourceAllocation {
/**
* Removes a resource for the specified interval
- *
+ *
* @param reservationInterval the interval for which the resource is to be
* removed
* @param totCap the resource to be removed
@@ -189,7 +189,7 @@ public class RLESparseResourceAllocation {
/**
* Returns the capacity, i.e. total resources allocated at the specified point
* of time
- *
+ *
* @param tick the time (UTC in ms) at which the capacity is requested
* @return the resources allocated at the specified time
*/
@@ -208,7 +208,7 @@ public class RLESparseResourceAllocation {
/**
* Get the timestamp of the earliest resource allocation
- *
+ *
* @return the timestamp of the first resource allocation
*/
public long getEarliestStartTime() {
@@ -226,7 +226,7 @@ public class RLESparseResourceAllocation {
/**
* Get the timestamp of the latest resource allocation
- *
+ *
* @return the timestamp of the last resource allocation
*/
public long getLatestEndTime() {
@@ -244,7 +244,7 @@ public class RLESparseResourceAllocation {
/**
* Returns true if there are no non-zero entries
- *
+ *
* @return true if there are no allocations or false otherwise
*/
public boolean isEmpty() {
@@ -287,7 +287,7 @@ public class RLESparseResourceAllocation {
/**
* Returns the JSON string representation of the current resources allocated
* over time
- *
+ *
* @return the JSON string representation of the current resources allocated
* over time
*/
@@ -312,4 +312,43 @@ public class RLESparseResourceAllocation {
}
}
+ /**
+ * Returns the representation of the current resources allocated over time as
+ * an interval map.
+ *
+ * @return the representation of the current resources allocated over time as
+ * an interval map.
+ */
+ public Map<ReservationInterval, Resource> toIntervalMap() {
+
+ readLock.lock();
+ try {
+ Map<ReservationInterval, Resource> allocations =
+ new TreeMap<ReservationInterval, Resource>();
+
+ // Empty
+ if (isEmpty()) {
+ return allocations;
+ }
+
+ Map.Entry<Long, Resource> lastEntry = null;
+ for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+
+ if (lastEntry != null) {
+ ReservationInterval interval =
+ new ReservationInterval(lastEntry.getKey(), entry.getKey());
+ Resource resource = lastEntry.getValue();
+
+ allocations.put(interval, resource);
+ }
+
+ lastEntry = entry;
+ }
+ return allocations;
+ } finally {
+ readLock.unlock();
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
deleted file mode 100644
index 6955036..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-/**
- * An entity that seeks to acquire resources to satisfy an user's contract
- */
-public interface ReservationAgent {
-
- /**
- * Create a reservation for the user that abides by the specified contract
- *
- * @param reservationId the identifier of the reservation to be created.
- * @param user the user who wants to create the reservation
- * @param plan the Plan to which the reservation must be fitted
- * @param contract encapsulates the resources the user requires for his
- * session
- *
- * @return whether the create operation was successful or not
- * @throws PlanningException if the session cannot be fitted into the plan
- */
- public boolean createReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException;
-
- /**
- * Update a reservation for the user that abides by the specified contract
- *
- * @param reservationId the identifier of the reservation to be updated
- * @param user the user who wants to create the session
- * @param plan the Plan to which the reservation must be fitted
- * @param contract encapsulates the resources the user requires for his
- * reservation
- *
- * @return whether the update operation was successful or not
- * @throws PlanningException if the reservation cannot be fitted into the plan
- */
- public boolean updateReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException;
-
- /**
- * Delete an user reservation
- *
- * @param reservationId the identifier of the reservation to be deleted
- * @param user the user who wants to create the reservation
- * @param plan the Plan to which the session must be fitted
- *
- * @return whether the delete operation was successful or not
- * @throws PlanningException if the reservation cannot be fitted into the plan
- */
- public boolean deleteReservation(ReservationId reservationId, String user,
- Plan plan) throws PlanningException;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
index 2af1ffd..c430b1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
public abstract class ReservationSchedulerConfiguration extends Configuration {
@@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_AGENT_NAME =
- "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy";
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
- "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner";
@InterfaceAudience.Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index cb76dcf..3309693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -24,12 +24,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* This interface is the one implemented by any system that wants to support
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
index 8affae4..5562adc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap;
import java.util.Map;
-final class ReservationSystemUtil {
+/**
+ * Simple helper class for static methods used to transform across
+ * common formats in tests
+ */
+public final class ReservationSystemUtil {
private ReservationSystemUtil() {
// not called
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
deleted file mode 100644
index b5a6a99..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.UTCClock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This (re)planner scan a period of time from now to a maximum time window (or
- * the end of the last session, whichever comes first) checking the overall
- * capacity is not violated.
- *
- * It greedily removes sessions in reversed order of acceptance (latest accepted
- * is the first removed).
- */
-public class SimpleCapacityReplanner implements Planner {
-
- private static final Log LOG = LogFactory
- .getLog(SimpleCapacityReplanner.class);
-
- private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
-
- private final Clock clock;
-
- // this allows to control to time-span of this replanning
- // far into the future time instants might be worth replanning for
- // later on
- private long lengthOfCheckZone;
-
- public SimpleCapacityReplanner() {
- this(new UTCClock());
- }
-
- @VisibleForTesting
- SimpleCapacityReplanner(Clock clock) {
- this.clock = clock;
- }
-
- @Override
- public void init(String planQueueName,
- ReservationSchedulerConfiguration conf) {
- this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
- }
-
- @Override
- public void plan(Plan plan, List<ReservationDefinition> contracts)
- throws PlanningException {
-
- if (contracts != null) {
- throw new RuntimeException(
- "SimpleCapacityReplanner cannot handle new reservation contracts");
- }
-
- ResourceCalculator resCalc = plan.getResourceCalculator();
- Resource totCap = plan.getTotalCapacity();
- long now = clock.getTime();
-
- // loop on all moment in time from now to the end of the check Zone
- // or the end of the planned sessions whichever comes first
- for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t +=
- plan.getStep()) {
- Resource excessCap =
- Resources.subtract(plan.getTotalCommittedResources(t), totCap);
- // if we are violating
- if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
- // sorted on reverse order of acceptance, so newest reservations first
- Set<ReservationAllocation> curReservations =
- new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
- for (Iterator<ReservationAllocation> resIter =
- curReservations.iterator(); resIter.hasNext()
- && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) {
- ReservationAllocation reservation = resIter.next();
- plan.deleteReservation(reservation.getReservationId());
- excessCap =
- Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
- LOG.info("Removing reservation " + reservation.getReservationId()
- + " to repair physical-resource constraints in the plan: "
- + plan.getQueueName());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
new file mode 100644
index 0000000..a389928
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A planning algorithm that first runs LowCostAligned, and if it fails runs
+ * Greedy.
+ */
+public class AlignedPlannerWithGreedy implements ReservationAgent {
+
+ // Default smoothness factor
+ private static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
+
+ // Log
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AlignedPlannerWithGreedy.class);
+
+ // Smoothness factor
+ private final ReservationAgent planner;
+
+ // Constructor
+ public AlignedPlannerWithGreedy() {
+ this(DEFAULT_SMOOTHNESS_FACTOR);
+ }
+
+ // Constructor
+ public AlignedPlannerWithGreedy(int smoothnessFactor) {
+
+ // List of algorithms
+ List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
+
+ // LowCostAligned planning algorithm
+ ReservationAgent algAligned =
+ new IterativePlanner(new StageEarliestStartByDemand(),
+ new StageAllocatorLowCostAligned(smoothnessFactor));
+ listAlg.add(algAligned);
+
+ // Greedy planning algorithm
+ ReservationAgent algGreedy =
+ new IterativePlanner(new StageEarliestStartByJobArrival(),
+ new StageAllocatorGreedy());
+ listAlg.add(algGreedy);
+
+ // Set planner:
+ // 1. Attempt to execute algAligned
+ // 2. If failed, fall back to algGreedy
+ planner = new TryManyReservationAgents(listAlg);
+
+ }
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("placing the following ReservationRequest: " + contract);
+
+ try {
+ boolean res =
+ planner.createReservation(reservationId, user, plan, contract);
+
+ if (res) {
+ LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ } else {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ }
+ return res;
+ } catch (PlanningException e) {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+ + ", Contract: " + contract.toString());
+ throw e;
+ }
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("updating the following ReservationRequest: " + contract);
+
+ return planner.updateReservation(reservationId, user, plan, contract);
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ LOG.info("removing the following ReservationId: " + reservationId);
+
+ return planner.deleteReservation(reservationId, user, plan);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
new file mode 100644
index 0000000..db82a66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Agent employs a simple greedy placement strategy, placing the various
+ * stages of a {@link ReservationDefinition} from the deadline moving backward
+ * towards the arrival. This allows jobs with earlier deadline to be scheduled
+ * greedily as well. Combined with an opportunistic anticipation of work if the
+ * cluster is not fully utilized also seems to provide good latency for
+ * best-effort jobs (i.e., jobs running without a reservation).
+ *
+ * This agent does not account for locality and only consider container
+ * granularity for validation purposes (i.e., you can't exceed max-container
+ * size).
+ */
+
+public class GreedyReservationAgent implements ReservationAgent {
+
+ // Log
+ private static final Logger LOG = LoggerFactory
+ .getLogger(GreedyReservationAgent.class);
+
+ // Greedy planner
+ private final ReservationAgent planner = new IterativePlanner(
+ new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("placing the following ReservationRequest: " + contract);
+
+ try {
+ boolean res =
+ planner.createReservation(reservationId, user, plan, contract);
+
+ if (res) {
+ LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ } else {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ }
+ return res;
+ } catch (PlanningException e) {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+ + ", Contract: " + contract.toString());
+ throw e;
+ }
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("updating the following ReservationRequest: " + contract);
+
+ return planner.updateReservation(reservationId, user, plan, contract);
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ LOG.info("removing the following ReservationId: " + reservationId);
+
+ return planner.deleteReservation(reservationId, user, plan);
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
new file mode 100644
index 0000000..342c2e7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A planning algorithm consisting of two main phases. The algorithm iterates
+ * over the job stages in descending order. For each stage, the algorithm: 1.
+ * Determines an interval [stageArrivalTime, stageDeadline) in which the stage
+ * is allocated. 2. Computes an allocation for the stage inside the interval.
+ *
+ * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
+ * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
+ * each stage is set as succcessorStartTime - the starting time of its
+ * succeeding stage (or jobDeadline if it is the last stage).
+ *
+ * The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
+ * setAlgComputeStageAllocation
+ */
+public class IterativePlanner extends PlanningAlgorithm {
+
+ // Modifications performed by the algorithm that are not been reflected in the
+ // actual plan while a request is still pending.
+ private RLESparseResourceAllocation planModifications;
+
+ // Data extracted from plan
+ private Map<Long, Resource> planLoads;
+ private Resource capacity;
+ private long step;
+
+ // Job parameters
+ private ReservationRequestInterpreter jobType;
+ private long jobArrival;
+ private long jobDeadline;
+
+ // Phase algorithms
+ private StageEarliestStart algStageEarliestStart = null;
+ private StageAllocator algStageAllocator = null;
+
+ // Constructor
+ public IterativePlanner(StageEarliestStart algEarliestStartTime,
+ StageAllocator algStageAllocator) {
+
+ setAlgStageEarliestStart(algEarliestStartTime);
+ setAlgStageAllocator(algStageAllocator);
+
+ }
+
+ @Override
+ public RLESparseResourceAllocation computeJobAllocation(Plan plan,
+ ReservationId reservationId, ReservationDefinition reservation)
+ throws ContractValidationException {
+
+ // Initialize
+ initialize(plan, reservation);
+
+ // If the job has been previously reserved, logically remove its allocation
+ ReservationAllocation oldReservation =
+ plan.getReservationById(reservationId);
+ if (oldReservation != null) {
+ ignoreOldAllocation(oldReservation);
+ }
+
+ // Create the allocations data structure
+ RLESparseResourceAllocation allocations =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ // Get a reverse iterator for the set of stages
+ ListIterator<ReservationRequest> li =
+ reservation
+ .getReservationRequests()
+ .getReservationResources()
+ .listIterator(
+ reservation.getReservationRequests().getReservationResources()
+ .size());
+
+ // Current stage
+ ReservationRequest currentReservationStage;
+
+ // Index, points on the current node
+ int index =
+ reservation.getReservationRequests().getReservationResources().size();
+
+ // Stage deadlines
+ long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
+ long successorStartingTime = -1;
+
+ // Iterate the stages in reverse order
+ while (li.hasPrevious()) {
+
+ // Get current stage
+ currentReservationStage = li.previous();
+ index -= 1;
+
+ // Validate that the ReservationRequest respects basic constraints
+ validateInputStage(plan, currentReservationStage);
+
+ // Compute an adjusted earliestStart for this resource
+ // (we need this to provision some space for the ORDER contracts)
+ long stageArrivalTime = reservation.getArrival();
+ if (jobType == ReservationRequestInterpreter.R_ORDER
+ || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+ stageArrivalTime =
+ computeEarliestStartingTime(plan, reservation, index,
+ currentReservationStage, stageDeadline);
+ }
+ stageArrivalTime = stepRoundUp(stageArrivalTime, step);
+ stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+
+ // Compute the allocation of a single stage
+ Map<ReservationInterval, Resource> curAlloc =
+ computeStageAllocation(plan, currentReservationStage,
+ stageArrivalTime, stageDeadline);
+
+ // If we did not find an allocation, return NULL
+ // (unless it's an ANY job, then we simply continue).
+ if (curAlloc == null) {
+
+ // If it's an ANY job, we can move to the next possible request
+ if (jobType == ReservationRequestInterpreter.R_ANY) {
+ continue;
+ }
+
+ // Otherwise, the job cannot be allocated
+ return null;
+
+ }
+
+ // Get the start & end time of the current allocation
+ Long stageStartTime = findEarliestTime(curAlloc.keySet());
+ Long stageEndTime = findLatestTime(curAlloc.keySet());
+
+ // If we did find an allocation for the stage, add it
+ for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
+ allocations.addInterval(entry.getKey(), entry.getValue());
+ }
+
+ // If this is an ANY clause, we have finished
+ if (jobType == ReservationRequestInterpreter.R_ANY) {
+ break;
+ }
+
+ // If ORDER job, set the stageDeadline of the next stage to be processed
+ if (jobType == ReservationRequestInterpreter.R_ORDER
+ || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+
+ // Verify that there is no gap, in case the job is ORDER_NO_GAP
+ if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
+ && successorStartingTime != -1
+ && successorStartingTime > stageEndTime) {
+
+ return null;
+
+ }
+
+ // Store the stageStartTime and set the new stageDeadline
+ successorStartingTime = stageStartTime;
+ stageDeadline = stageStartTime;
+
+ }
+
+ }
+
+ // If the allocation is empty, return an error
+ if (allocations.isEmpty()) {
+ return null;
+ }
+
+ return allocations;
+
+ }
+
+ protected void initialize(Plan plan, ReservationDefinition reservation) {
+
+ // Get plan step & capacity
+ capacity = plan.getTotalCapacity();
+ step = plan.getStep();
+
+ // Get job parameters (type, arrival time & deadline)
+ jobType = reservation.getReservationRequests().getInterpreter();
+ jobArrival = stepRoundUp(reservation.getArrival(), step);
+ jobDeadline = stepRoundDown(reservation.getDeadline(), step);
+
+ // Dirty read of plan load
+ planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
+
+ // Initialize the plan modifications
+ planModifications =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ }
+
+ private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
+ long endTime) {
+
+ // Create map
+ Map<Long, Resource> loads = new HashMap<Long, Resource>();
+
+ // Calculate the load for every time slot between [start,end)
+ for (long t = startTime; t < endTime; t += step) {
+ Resource load = plan.getTotalCommittedResources(t);
+ loads.put(t, load);
+ }
+
+ // Return map
+ return loads;
+
+ }
+
+ private void ignoreOldAllocation(ReservationAllocation oldReservation) {
+
+ // If there is no old reservation, return
+ if (oldReservation == null) {
+ return;
+ }
+
+ // Subtract each allocation interval from the planModifications
+ for (Entry<ReservationInterval, Resource> entry : oldReservation
+ .getAllocationRequests().entrySet()) {
+
+ // Read the entry
+ ReservationInterval interval = entry.getKey();
+ Resource resource = entry.getValue();
+
+ // Find the actual request
+ Resource negativeResource = Resources.multiply(resource, -1);
+
+ // Insert it into planModifications as a 'negative' request, to
+ // represent available resources
+ planModifications.addInterval(interval, negativeResource);
+
+ }
+
+ }
+
+ private void validateInputStage(Plan plan, ReservationRequest rr)
+ throws ContractValidationException {
+
+ // Validate concurrency
+ if (rr.getConcurrency() < 1) {
+ throw new ContractValidationException("Gang Size should be >= 1");
+ }
+
+ // Validate number of containers
+ if (rr.getNumContainers() <= 0) {
+ throw new ContractValidationException("Num containers should be > 0");
+ }
+
+ // Check that gangSize and numContainers are compatible
+ if (rr.getNumContainers() % rr.getConcurrency() != 0) {
+ throw new ContractValidationException(
+ "Parallelism must be an exact multiple of gang size");
+ }
+
+ // Check that the largest container request does not exceed the cluster-wide
+ // limit for container sizes
+ if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
+ rr.getCapability(), plan.getMaximumAllocation())) {
+
+ throw new ContractValidationException(
+ "Individual capability requests should not exceed cluster's " +
+ "maxAlloc");
+
+ }
+
+ }
+
+ // Call algEarliestStartTime()
+ protected long computeEarliestStartingTime(Plan plan,
+ ReservationDefinition reservation, int index,
+ ReservationRequest currentReservationStage, long stageDeadline) {
+
+ return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
+ currentReservationStage, stageDeadline);
+
+ }
+
+ // Call algStageAllocator
+ protected Map<ReservationInterval, Resource> computeStageAllocation(
+ Plan plan, ReservationRequest rr, long stageArrivalTime,
+ long stageDeadline) {
+
+ return algStageAllocator.computeStageAllocation(plan, planLoads,
+ planModifications, rr, stageArrivalTime, stageDeadline);
+
+ }
+
+ // Set the algorithm: algStageEarliestStart
+ public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
+
+ this.algStageEarliestStart = alg;
+ return this; // To allow concatenation of setAlg() functions
+
+ }
+
+ // Set the algorithm: algStageAllocator
+ public IterativePlanner setAlgStageAllocator(StageAllocator alg) {
+
+ this.algStageAllocator = alg;
+ return this; // To allow concatenation of setAlg() functions
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
new file mode 100644
index 0000000..abac6ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+public interface Planner {
+
+ /**
+ * Update the existing {@link Plan}, by adding/removing/updating existing
+ * reservations, and adding a subset of the reservation requests in the
+ * contracts parameter.
+ *
+ * @param plan the {@link Plan} to replan
+ * @param contracts the list of reservation requests
+ * @throws PlanningException
+ */
+ public void plan(Plan plan, List<ReservationDefinition> contracts)
+ throws PlanningException;
+
+ /**
+ * Initialize the replanner
+ *
+ * @param planQueueName the name of the queue for this plan
+ * @param conf the scheduler configuration
+ */
+ void init(String planQueueName, ReservationSchedulerConfiguration conf);
+}
[3/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)
Posted by cu...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
new file mode 100644
index 0000000..9a0a0f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An abstract class that follows the general behavior of planning algorithms.
+ */
+public abstract class PlanningAlgorithm implements ReservationAgent {
+
+ /**
+ * Performs the actual allocation for a ReservationDefinition within a Plan.
+ *
+ * @param reservationId the identifier of the reservation
+ * @param user the user who owns the reservation
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources required by the user for his
+ * session
+ * @param oldReservation the existing reservation (null if none)
+ * @return whether the allocateUser function was successful or not
+ *
+ * @throws PlanningException if the session cannot be fitted into the plan
+ * @throws ContractValidationException
+ */
+ protected boolean allocateUser(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract,
+ ReservationAllocation oldReservation) throws PlanningException,
+ ContractValidationException {
+
+ // Adjust the ResourceDefinition to account for system "imperfections"
+ // (e.g., scheduling delays for large containers).
+ ReservationDefinition adjustedContract = adjustContract(plan, contract);
+
+ // Compute the job allocation
+ RLESparseResourceAllocation allocation =
+ computeJobAllocation(plan, reservationId, adjustedContract);
+
+ // If no job allocation was found, fail
+ if (allocation == null) {
+ throw new PlanningException(
+ "The planning algorithm could not find a valid allocation"
+ + " for your request");
+ }
+
+ // Translate the allocation to a map (with zero paddings)
+ long step = plan.getStep();
+ long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
+ long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
+ Map<ReservationInterval, Resource> mapAllocations =
+ allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
+
+ // Create the reservation
+ ReservationAllocation capReservation =
+ new InMemoryReservationAllocation(reservationId, // ID
+ adjustedContract, // Contract
+ user, // User name
+ plan.getQueueName(), // Queue name
+ findEarliestTime(mapAllocations.keySet()), // Earliest start time
+ findLatestTime(mapAllocations.keySet()), // Latest end time
+ mapAllocations, // Allocations
+ plan.getResourceCalculator(), // Resource calculator
+ plan.getMinimumAllocation()); // Minimum allocation
+
+ // Add (or update) the reservation allocation
+ if (oldReservation != null) {
+ return plan.updateReservation(capReservation);
+ } else {
+ return plan.addReservation(capReservation);
+ }
+
+ }
+
+ private Map<ReservationInterval, Resource>
+ allocationsToPaddedMap(RLESparseResourceAllocation allocation,
+ long jobArrival, long jobDeadline) {
+
+ // Allocate
+ Map<ReservationInterval, Resource> mapAllocations =
+ allocation.toIntervalMap();
+
+ // Zero allocation
+ Resource zeroResource = Resource.newInstance(0, 0);
+
+ // Pad at the beginning
+ long earliestStart = findEarliestTime(mapAllocations.keySet());
+ if (jobArrival < earliestStart) {
+ mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
+ zeroResource);
+ }
+
+ // Pad at the beginning
+ long latestEnd = findLatestTime(mapAllocations.keySet());
+ if (latestEnd < jobDeadline) {
+ mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
+ zeroResource);
+ }
+
+ return mapAllocations;
+
+ }
+
+ public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
+ ReservationId reservationId, ReservationDefinition reservation)
+ throws PlanningException, ContractValidationException;
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Allocate
+ return allocateUser(reservationId, user, plan, contract, null);
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Get the old allocation
+ ReservationAllocation oldAlloc = plan.getReservationById(reservationId);
+
+ // Allocate (ignores the old allocation)
+ return allocateUser(reservationId, user, plan, contract, oldAlloc);
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ // Delete the existing reservation
+ return plan.deleteReservation(reservationId);
+
+ }
+
+ protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
+
+ long ret = Long.MAX_VALUE;
+ for (ReservationInterval s : sesInt) {
+ if (s.getStartTime() < ret) {
+ ret = s.getStartTime();
+ }
+ }
+ return ret;
+
+ }
+
+ protected static long findLatestTime(Set<ReservationInterval> sesInt) {
+
+ long ret = Long.MIN_VALUE;
+ for (ReservationInterval s : sesInt) {
+ if (s.getEndTime() > ret) {
+ ret = s.getEndTime();
+ }
+ }
+ return ret;
+
+ }
+
+ protected static long stepRoundDown(long t, long step) {
+ return (t / step) * step;
+ }
+
+ protected static long stepRoundUp(long t, long step) {
+ return ((t + step - 1) / step) * step;
+ }
+
+ private ReservationDefinition adjustContract(Plan plan,
+ ReservationDefinition originalContract) {
+
+ // Place here adjustment. For example using QueueMetrics we can track
+ // large container delays per YARN-YARN-1990
+
+ return originalContract;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
new file mode 100644
index 0000000..bdea2f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An entity that seeks to acquire resources to satisfy an user's contract
+ */
+public interface ReservationAgent {
+
+ /**
+ * Create a reservation for the user that abides by the specified contract
+ *
+ * @param reservationId the identifier of the reservation to be created.
+ * @param user the user who wants to create the reservation
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources the user requires for his
+ * session
+ *
+ * @return whether the create operation was successful or not
+ * @throws PlanningException if the session cannot be fitted into the plan
+ */
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException;
+
+ /**
+ * Update a reservation for the user that abides by the specified contract
+ *
+ * @param reservationId the identifier of the reservation to be updated
+ * @param user the user who wants to create the session
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources the user requires for his
+ * reservation
+ *
+ * @return whether the update operation was successful or not
+ * @throws PlanningException if the reservation cannot be fitted into the plan
+ */
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException;
+
+ /**
+ * Delete an user reservation
+ *
+ * @param reservationId the identifier of the reservation to be deleted
+ * @param user the user who wants to create the reservation
+ * @param plan the Plan to which the session must be fitted
+ *
+ * @return whether the delete operation was successful or not
+ * @throws PlanningException if the reservation cannot be fitted into the plan
+ */
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
new file mode 100644
index 0000000..7507783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This (re)planner scan a period of time from now to a maximum time window (or
+ * the end of the last session, whichever comes first) checking the overall
+ * capacity is not violated.
+ *
+ * It greedily removes sessions in reversed order of acceptance (latest accepted
+ * is the first removed).
+ */
+public class SimpleCapacityReplanner implements Planner {
+
+ private static final Log LOG = LogFactory
+ .getLog(SimpleCapacityReplanner.class);
+
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+ private final Clock clock;
+
+ // this allows to control to time-span of this replanning
+ // far into the future time instants might be worth replanning for
+ // later on
+ private long lengthOfCheckZone;
+
+ public SimpleCapacityReplanner() {
+ this(new UTCClock());
+ }
+
+ @VisibleForTesting
+ SimpleCapacityReplanner(Clock clock) {
+ this.clock = clock;
+ }
+
+ @Override
+ public void init(String planQueueName,
+ ReservationSchedulerConfiguration conf) {
+ this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
+ }
+
+ @Override
+ public void plan(Plan plan, List<ReservationDefinition> contracts)
+ throws PlanningException {
+
+ if (contracts != null) {
+ throw new RuntimeException(
+ "SimpleCapacityReplanner cannot handle new reservation contracts");
+ }
+
+ ResourceCalculator resCalc = plan.getResourceCalculator();
+ Resource totCap = plan.getTotalCapacity();
+ long now = clock.getTime();
+
+ // loop on all moment in time from now to the end of the check Zone
+ // or the end of the planned sessions whichever comes first
+ for (long t = now;
+ (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
+ t += plan.getStep()) {
+ Resource excessCap =
+ Resources.subtract(plan.getTotalCommittedResources(t), totCap);
+ // if we are violating
+ if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
+ // sorted on reverse order of acceptance, so newest reservations first
+ Set<ReservationAllocation> curReservations =
+ new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
+ for (Iterator<ReservationAllocation> resIter =
+ curReservations.iterator(); resIter.hasNext()
+ && Resources.greaterThan(resCalc, totCap, excessCap,
+ ZERO_RESOURCE);) {
+ ReservationAllocation reservation = resIter.next();
+ plan.deleteReservation(reservation.getReservationId());
+ excessCap =
+ Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
+ LOG.info("Removing reservation " + reservation.getReservationId()
+ + " to repair physical-resource constraints in the plan: "
+ + plan.getQueueName());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
new file mode 100644
index 0000000..9df6b74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * Interface for allocating a single stage in IterativePlanner.
+ */
+public interface StageAllocator {
+
+ /**
+ * Computes the allocation of a stage inside a defined time interval.
+ *
+ * @param plan the Plan to which the reservation must be fitted
+ * @param planLoads a 'dirty' read of the plan loads at each time
+ * @param planModifications the allocations performed by the planning
+ * algorithm which are not yet reflected by plan
+ * @param rr the stage
+ * @param stageEarliestStart the arrival time (earliest starting time) set for
+ * the stage by the two phase planning algorithm
+ * @param stageDeadline the deadline of the stage set by the two phase
+ * planning algorithm
+ *
+ * @return The computed allocation (or null if the stage could not be
+ * allocated)
+ */
+ Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
new file mode 100644
index 0000000..773fbdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Computes the stage allocation according to the greedy allocation rule. The
+ * greedy rule repeatedly allocates requested containers at the rightmost
+ * (latest) free interval.
+ */
+
+public class StageAllocatorGreedy implements StageAllocator {
+
+ @Override
+ public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline) {
+
+ Resource totalCapacity = plan.getTotalCapacity();
+
+ Map<ReservationInterval, Resource> allocationRequests =
+ new HashMap<ReservationInterval, Resource>();
+
+ // compute the gang as a resource and get the duration
+ Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
+ long dur = rr.getDuration();
+ long step = plan.getStep();
+
+ // ceil the duration to the next multiple of the plan step
+ if (dur % step != 0) {
+ dur += (step - (dur % step));
+ }
+
+ // we know for sure that this division has no remainder (part of contract
+ // with user, validate before
+ int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+ int maxGang = 0;
+
+ // loop trying to place until we are done, or we are considering
+ // an invalid range of times
+ while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
+
+ // as we run along we remember how many gangs we can fit, and what
+ // was the most constraining moment in time (we will restart just
+ // after that to place the next batch)
+ maxGang = gangsToPlace;
+ long minPoint = stageDeadline;
+ int curMaxGang = maxGang;
+
+ // start placing at deadline (excluded due to [,) interval semantics and
+ // move backward
+ for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
+ && maxGang > 0; t = t - plan.getStep()) {
+
+ // compute net available resources
+ Resource netAvailableRes = Resources.clone(totalCapacity);
+ // Resources.addTo(netAvailableRes, oldResCap);
+ Resources.subtractFrom(netAvailableRes,
+ plan.getTotalCommittedResources(t));
+ Resources.subtractFrom(netAvailableRes,
+ planModifications.getCapacityAtTime(t));
+
+ // compute maximum number of gangs we could fit
+ curMaxGang =
+ (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+ totalCapacity, netAvailableRes, gang));
+
+ // pick the minimum between available resources in this instant, and how
+ // many gangs we have to place
+ curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+ // compare with previous max, and set it. also remember *where* we found
+ // the minimum (useful for next attempts)
+ if (curMaxGang <= maxGang) {
+ maxGang = curMaxGang;
+ minPoint = t;
+ }
+ }
+
+ // if we were able to place any gang, record this, and decrement
+ // gangsToPlace
+ if (maxGang > 0) {
+ gangsToPlace -= maxGang;
+
+ ReservationInterval reservationInt =
+ new ReservationInterval(stageDeadline - dur, stageDeadline);
+ Resource reservationRes =
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()
+ * maxGang);
+ // remember occupied space (plan is read-only till we find a plausible
+ // allocation for the entire request). This is needed since we might be
+ // placing other ReservationRequest within the same
+ // ReservationDefinition,
+ // and we must avoid double-counting the available resources
+ planModifications.addInterval(reservationInt, reservationRes);
+ allocationRequests.put(reservationInt, reservationRes);
+
+ }
+
+ // reset our new starting point (curDeadline) to the most constraining
+ // point so far, we will look "left" of that to find more places where
+ // to schedule gangs (for sure nothing on the "right" of this point can
+ // fit a full gang.
+ stageDeadline = minPoint;
+ }
+
+ // if no gangs are left to place we succeed and return the allocation
+ if (gangsToPlace == 0) {
+ return allocationRequests;
+ } else {
+ // If we are here is becasue we did not manage to satisfy this request.
+ // So we need to remove unwanted side-effect from tempAssigned (needed
+ // for ANY).
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation
+ : allocationRequests.entrySet()) {
+ planModifications.removeInterval(tempAllocation.getKey(),
+ tempAllocation.getValue());
+ }
+ // and return null to signal failure in this allocation
+ return null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
new file mode 100644
index 0000000..4b5763d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A stage allocator that iteratively allocates containers in the
+ * {@link DurationInterval} with lowest overall cost. The algorithm only
+ * considers intervals of the form: [stageDeadline - (n+1)*duration,
+ * stageDeadline - n*duration) for an integer n. This guarantees that the
+ * allocations are aligned (as opposed to overlapping duration intervals).
+ *
+ * The smoothnessFactor parameter controls the number of containers that are
+ * simultaneously allocated in each iteration of the algorithm.
+ */
+
+public class StageAllocatorLowCostAligned implements StageAllocator {
+
+ // Smoothness factor
+ private int smoothnessFactor = 10;
+
+ // Constructor
+ public StageAllocatorLowCostAligned() {
+ }
+
+ // Constructor
+ public StageAllocatorLowCostAligned(int smoothnessFactor) {
+ this.smoothnessFactor = smoothnessFactor;
+ }
+
+ // computeJobAllocation()
+ @Override
+ public Map<ReservationInterval, Resource> computeStageAllocation(
+ Plan plan, Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline) {
+
+ // Initialize
+ ResourceCalculator resCalc = plan.getResourceCalculator();
+ Resource capacity = plan.getTotalCapacity();
+ long step = plan.getStep();
+
+ // Create allocationRequestsearlies
+ RLESparseResourceAllocation allocationRequests =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ // Initialize parameters
+ long duration = stepRoundUp(rr.getDuration(), step);
+ int windowSizeInDurations =
+ (int) ((stageDeadline - stageEarliestStart) / duration);
+ int totalGangs = rr.getNumContainers() / rr.getConcurrency();
+ int numContainersPerGang = rr.getConcurrency();
+ Resource gang =
+ Resources.multiply(rr.getCapability(), numContainersPerGang);
+
+ // Set maxGangsPerUnit
+ int maxGangsPerUnit =
+ (int) Math.max(
+ Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
+ maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
+
+ // If window size is too small, return null
+ if (windowSizeInDurations <= 0) {
+ return null;
+ }
+
+ // Initialize tree sorted by costs
+ TreeSet<DurationInterval> durationIntervalsSortedByCost =
+ new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
+ @Override
+ public int compare(DurationInterval val1, DurationInterval val2) {
+
+ int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
+ }
+ });
+
+ // Add durationIntervals that end at (endTime - n*duration) for some n.
+ for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+ + duration; intervalEnd -= duration) {
+
+ long intervalStart = intervalEnd - duration;
+
+ // Get duration interval [intervalStart,intervalEnd)
+ DurationInterval durationInterval =
+ getDurationInterval(intervalStart, intervalEnd, planLoads,
+ planModifications, capacity, resCalc, step);
+
+ // If the interval can fit a gang, add it to the tree
+ if (durationInterval.canAllocate(gang, capacity, resCalc)) {
+ durationIntervalsSortedByCost.add(durationInterval);
+ }
+ }
+
+ // Allocate
+ int remainingGangs = totalGangs;
+ while (remainingGangs > 0) {
+
+ // If no durationInterval can fit a gang, break and return null
+ if (durationIntervalsSortedByCost.isEmpty()) {
+ break;
+ }
+
+ // Get best duration interval
+ DurationInterval bestDurationInterval =
+ durationIntervalsSortedByCost.first();
+ int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
+
+ // Add it
+ remainingGangs -= numGangsToAllocate;
+
+ ReservationInterval reservationInt =
+ new ReservationInterval(bestDurationInterval.getStartTime(),
+ bestDurationInterval.getEndTime());
+
+ Resource reservationRes =
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()
+ * numGangsToAllocate);
+
+ planModifications.addInterval(reservationInt, reservationRes);
+ allocationRequests.addInterval(reservationInt, reservationRes);
+
+ // Remove from tree
+ durationIntervalsSortedByCost.remove(bestDurationInterval);
+
+ // Get updated interval
+ DurationInterval updatedDurationInterval =
+ getDurationInterval(bestDurationInterval.getStartTime(),
+ bestDurationInterval.getStartTime() + duration, planLoads,
+ planModifications, capacity, resCalc, step);
+
+ // Add to tree, if possible
+ if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
+ durationIntervalsSortedByCost.add(updatedDurationInterval);
+ }
+
+ }
+
+ // Get the final allocation
+ Map<ReservationInterval, Resource> allocations =
+ allocationRequests.toIntervalMap();
+
+ // If no gangs are left to place we succeed and return the allocation
+ if (remainingGangs <= 0) {
+ return allocations;
+ } else {
+
+ // If we are here is because we did not manage to satisfy this request.
+ // We remove unwanted side-effect from planModifications (needed for ANY).
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation
+ : allocations.entrySet()) {
+
+ planModifications.removeInterval(tempAllocation.getKey(),
+ tempAllocation.getValue());
+
+ }
+ // Return null to signal failure in this allocation
+ return null;
+
+ }
+
+ }
+
+ protected DurationInterval getDurationInterval(long startTime, long endTime,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc, long step) {
+
+ // Initialize the dominant loads structure
+ Resource dominantResources = Resource.newInstance(0, 0);
+
+ // Calculate totalCost and maxLoad
+ double totalCost = 0.0;
+ for (long t = startTime; t < endTime; t += step) {
+
+ // Get the load
+ Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+ // Increase the total cost
+ totalCost += calcCostOfLoad(load, capacity, resCalc);
+
+ // Update the dominant resources
+ dominantResources = Resources.componentwiseMax(dominantResources, load);
+
+ }
+
+ // Return the corresponding durationInterval
+ return new DurationInterval(startTime, endTime, totalCost,
+ dominantResources);
+
+ }
+
+ protected double calcCostOfInterval(long startTime, long endTime,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc, long step) {
+
+ // Sum costs in the interval [startTime,endTime)
+ double totalCost = 0.0;
+ for (long t = startTime; t < endTime; t += step) {
+ totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
+ resCalc);
+ }
+
+ // Return sum
+ return totalCost;
+
+ }
+
+ protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ // Get the current load at time t
+ Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+ // Return cost
+ return calcCostOfLoad(load, capacity, resCalc);
+
+ }
+
+ protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications) {
+
+ Resource planLoad = planLoads.get(t);
+ planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
+
+ return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
+
+ }
+
+ protected double calcCostOfLoad(Resource load, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ return resCalc.ratio(load, capacity);
+
+ }
+
+ protected static long stepRoundDown(long t, long step) {
+ return (t / step) * step;
+ }
+
+ protected static long stepRoundUp(long t, long step) {
+ return ((t + step - 1) / step) * step;
+ }
+
+ /**
+ * An inner class that represents an interval, typically of length duration.
+ * The class holds the total cost of the interval and the maximal load inside
+ * the interval in each dimension (both calculated externally).
+ */
+ protected static class DurationInterval {
+
+ private long startTime;
+ private long endTime;
+ private double cost;
+ private Resource maxLoad;
+
+ // Constructor
+ public DurationInterval(long startTime, long endTime, double cost,
+ Resource maxLoad) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.cost = cost;
+ this.maxLoad = maxLoad;
+ }
+
+ // canAllocate() - boolean function, returns whether requestedResources
+ // can be allocated during the durationInterval without
+ // violating capacity constraints
+ public boolean canAllocate(Resource requestedResources, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
+ return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
+
+ }
+
+ // numCanFit() - returns the maximal number of requestedResources can be
+ // allocated during the durationInterval without violating
+ // capacity constraints
+ public int numCanFit(Resource requestedResources, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ // Represents the largest resource demand that can be satisfied throughout
+ // the entire DurationInterval (i.e., during [startTime,endTime))
+ Resource availableResources = Resources.subtract(capacity, maxLoad);
+
+ // Maximal number of requestedResources that fit inside the interval
+ return (int) Math.floor(Resources.divide(resCalc, capacity,
+ availableResources, requestedResources));
+
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setStartTime(long value) {
+ this.startTime = value;
+ }
+
+ public long getEndTime() {
+ return this.endTime;
+ }
+
+ public void setEndTime(long value) {
+ this.endTime = value;
+ }
+
+ public Resource getMaxLoad() {
+ return this.maxLoad;
+ }
+
+ public void setMaxLoad(Resource value) {
+ this.maxLoad = value;
+ }
+
+ public double getTotalCost() {
+ return this.cost;
+ }
+
+ public void setTotalCost(double value) {
+ this.cost = value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
new file mode 100644
index 0000000..547616a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Interface for setting the earliest start time of a stage in IterativePlanner.
+ */
+public interface StageEarliestStart {
+
+ /**
+ * Computes the earliest allowed starting time for a given stage.
+ *
+ * @param plan the Plan to which the reservation must be fitted
+ * @param reservation the job contract
+ * @param index the index of the stage in the job contract
+ * @param currentReservationStage the stage
+ * @param stageDeadline the deadline of the stage set by the two phase
+ * planning algorithm
+ *
+ * @return the earliest allowed starting time for the stage.
+ */
+ long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
+ int index, ReservationRequest currentReservationStage,
+ long stageDeadline);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
new file mode 100644
index 0000000..5a46a4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.ListIterator;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage proportional to the job weight. The
+ * interval [jobArrival, stageDeadline) is divided as follows. First, each stage
+ * is guaranteed at least its requested duration. Then, the stage receives a
+ * fraction of the remaining time. The fraction is calculated as the ratio
+ * between the weight (total requested resources) of the stage and the total
+ * weight of all proceeding stages.
+ */
+
+public class StageEarliestStartByDemand implements StageEarliestStart {
+
+ private long step;
+
+ @Override
+ public long setEarliestStartTime(Plan plan,
+ ReservationDefinition reservation, int index, ReservationRequest current,
+ long stageDeadline) {
+
+ step = plan.getStep();
+
+ // If this is the first stage, don't bother with the computation.
+ if (index < 1) {
+ return reservation.getArrival();
+ }
+
+ // Get iterator
+ ListIterator<ReservationRequest> li =
+ reservation.getReservationRequests().getReservationResources()
+ .listIterator(index);
+ ReservationRequest rr;
+
+ // Calculate the total weight & total duration
+ double totalWeight = calcWeight(current);
+ long totalDuration = getRoundedDuration(current, plan);
+
+ while (li.hasPrevious()) {
+ rr = li.previous();
+ totalWeight += calcWeight(rr);
+ totalDuration += getRoundedDuration(rr, plan);
+ }
+
+ // Compute the weight of the current stage as compared to remaining ones
+ double ratio = calcWeight(current) / totalWeight;
+
+ // Estimate an early start time, such that:
+ // 1. Every stage is guaranteed to receive at least its duration
+ // 2. The remainder of the window is divided between stages
+ // proportionally to its workload (total memory consumption)
+ long window = stageDeadline - reservation.getArrival();
+ long windowRemainder = window - totalDuration;
+ long earlyStart =
+ (long) (stageDeadline - getRoundedDuration(current, plan)
+ - (windowRemainder * ratio));
+
+ // Realign if necessary (since we did some arithmetic)
+ earlyStart = stepRoundUp(earlyStart, step);
+
+ // Return
+ return earlyStart;
+
+ }
+
+ // Weight = total memory consumption of stage
+ protected double calcWeight(ReservationRequest stage) {
+ return (stage.getDuration() * stage.getCapability().getMemory())
+ * (stage.getNumContainers());
+ }
+
+ protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
+ return stepRoundUp(stage.getDuration(), step);
+ }
+
+ protected static long stepRoundDown(long t, long step) {
+ return (t / step) * step;
+ }
+
+ protected static long stepRoundUp(long t, long step) {
+ return ((t + step - 1) / step) * step;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
new file mode 100644
index 0000000..8347816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage as the job arrival time.
+ */
+public class StageEarliestStartByJobArrival implements StageEarliestStart {
+
+ @Override
+ public long setEarliestStartTime(Plan plan,
+ ReservationDefinition reservation, int index, ReservationRequest current,
+ long stageDeadline) {
+
+ return reservation.getArrival();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
new file mode 100644
index 0000000..1d37ce5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * A planning algorithm that invokes several other planning algorithms according
+ * to a given order. If one of the planners succeeds, the allocation it
+ * generates is returned.
+ */
+public class TryManyReservationAgents implements ReservationAgent {
+
+ // Planning algorithms
+ private final List<ReservationAgent> algs;
+
+ // Constructor
+ public TryManyReservationAgents(List<ReservationAgent> algs) {
+ this.algs = new LinkedList<ReservationAgent>(algs);
+ }
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Save the planning exception
+ PlanningException planningException = null;
+
+ // Try all of the algorithms, in order
+ for (ReservationAgent alg : algs) {
+
+ try {
+ if (alg.createReservation(reservationId, user, plan, contract)) {
+ return true;
+ }
+ } catch (PlanningException e) {
+ planningException = e;
+ }
+
+ }
+
+ // If all of the algorithms failed and one of the algorithms threw an
+ // exception, throw the last planning exception
+ if (planningException != null) {
+ throw planningException;
+ }
+
+ // If all of the algorithms failed, return false
+ return false;
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Save the planning exception
+ PlanningException planningException = null;
+
+ // Try all of the algorithms, in order
+ for (ReservationAgent alg : algs) {
+
+ try {
+ if (alg.updateReservation(reservationId, user, plan, contract)) {
+ return true;
+ }
+ } catch (PlanningException e) {
+ planningException = e;
+ }
+
+ }
+
+ // If all of the algorithms failed and one of the algorithms threw an
+ // exception, throw the last planning exception
+ if (planningException != null) {
+ throw planningException;
+ }
+
+ // If all of the algorithms failed, return false
+ return false;
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ return plan.deleteReservation(reservationId);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index be1d69a..adb9dcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -89,7 +90,7 @@ public class ReservationSystemTestUtil {
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert.assertTrue(
- plan.getReservationAgent() instanceof GreedyReservationAgent);
+ plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert.assertTrue(
plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
@@ -102,7 +103,7 @@ public class ReservationSystemTestUtil {
Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert
- .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+ .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 19f876d..f608c3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index b8663f6..15f9a89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
index f294eaf..4b685b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
index e9a4f50..43316f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;