You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/27 23:58:38 UTC
[27/37] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
new file mode 100644
index 0000000..bd18a2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -0,0 +1,611 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestGreedyReservationAgent {
+
+ ReservationAgent agent;
+ InMemoryPlan plan;
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ ResourceCalculator res = new DefaultResourceCalculator();
+ Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+ Random rand = new Random();
+ long step;
+
+ @Before
+ public void setup() throws Exception {
+
+ long seed = rand.nextLong();
+ rand.setSeed(seed);
+ Log.info("Running with seed: " + seed);
+
+ // setting completely loose quotas
+ long timeWindow = 1000000L;
+ Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
+ step = 1000L;
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ String reservationQ = testUtil.getFullReservationQueueName();
+
+ float instConstraint = 100;
+ float avgConstraint = 100;
+
+ ReservationSchedulerConfiguration conf =
+ ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+ instConstraint, avgConstraint);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, conf);
+ agent = new GreedyReservationAgent();
+
+ QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+ plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", null, true);
+ }
+
+ @SuppressWarnings("javadoc")
+ @Test
+ public void testSimple() throws PlanningException {
+
+ prepareBasicPlan();
+
+ // create a request with a single atomic ask
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(5 * step);
+ rr.setDeadline(20 * step);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 5, 10 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ for (long i = 10 * step; i < 20 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 10, 2 * 10)));
+ }
+
+ }
+
+ @Test
+ public void testOrder() throws PlanningException {
+ prepareBasicPlan();
+
+ // create a completely utilized segment around time 30
+ int[] f = { 100, 100 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 30 * step, 30 * step + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+ res, minAlloc)));
+
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0 * step);
+ rr.setDeadline(70 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20 * step);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 4);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+
+ System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testOrderNoGapImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create a completely utilized segment at time 30
+ int[] f = { 100, 100 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 30 * step, 30 * step + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+ res, minAlloc)));
+
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0L);
+
+ rr.setDeadline(70L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+
+ // validate
+ assertFalse("Agent-based allocation should have failed", result);
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 3);
+
+ System.out
+ .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testOrderNoGap() throws PlanningException {
+ prepareBasicPlan();
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0 * step);
+ rr.setDeadline(60 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20 * step);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ // validate
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
+
+ }
+
+ @Test
+ public void testSingleSliding() throws PlanningException {
+ prepareBasicPlan();
+
+ // create a single request for which we need subsequent (tight) packing.
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 200, 10, 10 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
+
+ System.out.println("--------AFTER packed ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAny() throws PlanningException {
+ prepareBasicPlan();
+ // create an ANY request, with an impossible step (last in list, first
+ // considered),
+ // and two satisfiable ones. We expect the second one to be returned.
+
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 5, 5, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 5, 10 * step);
+ ReservationRequest r3 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 110, 110, 10 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r3);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean res = agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", res);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+
+ System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAnyImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create an ANY request, with all impossible alternatives
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100L);
+ rr.setDeadline(120L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+
+ // longer than arrival-deadline
+ ReservationRequest r1 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 35, 5, 30);
+ // above max cluster size
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 110, 110, 10);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r1);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+ // validate results, we expect the second one to be accepted
+ assertFalse("Agent-based allocation should have failed", result);
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 2);
+
+ System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAll() throws PlanningException {
+ prepareBasicPlan();
+ // create an ALL request
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 5, 5, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 10, 20 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+
+ System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAllImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create an ALL request, with an impossible combination, it should be
+ // rejected, and allocation remain unchanged
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100L);
+ rr.setDeadline(120L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 55, 5, 10);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 55, 5, 20);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+
+ // validate results, we expect the second one to be accepted
+ assertFalse("Agent-based allocation failed", result);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 2);
+
+ System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ private void prepareBasicPlan() throws PlanningException {
+
+ // insert in the reservation a couple of controlled reservations, to create
+ // conditions for assignment that are non-empty
+
+ int[] f = { 10, 10, 20, 20, 20, 10, 10 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
+ .generateAllocation(0, step, f), res, minAlloc)));
+
+ int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
+ Map<ReservationInterval, Resource> alloc =
+ ReservationSystemTestUtil.generateAllocation(5000, step, f2);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
+
+ System.out.println("--------BEFORE AGENT----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+ }
+
+ private boolean check(ReservationAllocation cs, long start, long end,
+ int containers, int mem, int cores) {
+
+ boolean res = true;
+ for (long i = start; i < end; i++) {
+ res = res
+ && Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(mem * containers, cores * containers));
+ }
+ return res;
+ }
+
+ public void testStress(int numJobs) throws PlanningException, IOException {
+
+ long timeWindow = 1000000L;
+ Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
+ step = 1000L;
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
+ String reservationQ = testUtil.getFullReservationQueueName();
+ float instConstraint = 100;
+ float avgConstraint = 100;
+ ReservationSchedulerConfiguration conf =
+ ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+ instConstraint, avgConstraint);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, conf);
+
+ plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
+ clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
+
+ int acc = 0;
+ List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+ for (long i = 0; i < numJobs; i++) {
+ list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < numJobs; i++) {
+
+ try {
+ if (agent.createReservation(
+ ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
+ plan, list.get(i))) {
+ acc++;
+ }
+ } catch (PlanningException p) {
+ // ignore exceptions
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+ + " in " + (end - start) + "ms");
+ }
+
+ public static void main(String[] arg) {
+
+ // run a stress test with by default 1000 random jobs
+ int numJobs = 1000;
+ if (arg.length > 0) {
+ numJobs = Integer.parseInt(arg[0]);
+ }
+
+ try {
+ TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+ test.setup();
+ test.testStress(numJobs);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
new file mode 100644
index 0000000..aeb1e6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
@@ -0,0 +1,170 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Test;
+
+public class TestSimpleCapacityReplanner {
+
+ @Test
+ public void testReplanningPlanCapacityLoss() throws PlanningException {
+
+ Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+ ResourceCalculator res = new DefaultResourceCalculator();
+ long step = 1L;
+ Clock clock = mock(Clock.class);
+ ReservationAgent agent = mock(ReservationAgent.class);
+
+ SharingPolicy policy = new NoOverCommitPolicy();
+ policy.init("root.dedicated", null);
+
+ QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+ when(clock.getTime()).thenReturn(0L);
+ SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
+
+ ReservationSchedulerConfiguration conf =
+ mock(ReservationSchedulerConfiguration.class);
+ when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
+
+ enf.init("blah", conf);
+
+ // Initialize the plan with more resources
+ InMemoryPlan plan =
+ new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
+
+ // add reservation filling the plan (separating them 1ms, so we are sure
+ // s2 follows s1 on acceptance
+ long ts = System.currentTimeMillis();
+ ReservationId r1 = ReservationId.newInstance(ts, 1);
+ int[] f5 = { 20, 20, 20, 20, 20 };
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(1L);
+ ReservationId r2 = ReservationId.newInstance(ts, 2);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(2L);
+ ReservationId r3 = ReservationId.newInstance(ts, 3);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(3L);
+ ReservationId r4 = ReservationId.newInstance(ts, 4);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(4L);
+ ReservationId r5 = ReservationId.newInstance(ts, 5);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
+ "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+ minAlloc)));
+
+ int[] f6 = { 50, 50, 50, 50, 50 };
+ ReservationId r6 = ReservationId.newInstance(ts, 6);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
+ "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+ minAlloc)));
+ when(clock.getTime()).thenReturn(6L);
+ ReservationId r7 = ReservationId.newInstance(ts, 7);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
+ "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+ minAlloc)));
+
+ // remove some of the resources (requires replanning)
+ plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
+
+ when(clock.getTime()).thenReturn(0L);
+
+ // run the replanner
+ enf.plan(plan, null);
+
+ // check which reservation are still present
+ assertNotNull(plan.getReservationById(r1));
+ assertNotNull(plan.getReservationById(r2));
+ assertNotNull(plan.getReservationById(r3));
+ assertNotNull(plan.getReservationById(r6));
+ assertNotNull(plan.getReservationById(r7));
+
+ // and which ones are removed
+ assertNull(plan.getReservationById(r4));
+ assertNull(plan.getReservationById(r5));
+
+ // check resources at each moment in time no more exceed capacity
+ for (int i = 0; i < 20; i++) {
+ int tot = 0;
+ for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
+ tot = r.getResourcesAtTime(i).getMemory();
+ }
+ assertTrue(tot <= 70 * 1024);
+ }
+ }
+
+ private Map<ReservationInterval, Resource> generateAllocation(
+ int startTime, int[] alloc) {
+ Map<ReservationInterval, Resource> req =
+ new TreeMap<ReservationInterval, Resource>();
+ for (int i = 0; i < alloc.length; i++) {
+ req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+ ReservationSystemUtil.toResource(
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ alloc[i])));
+ }
+ return req;
+ }
+
+}