You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:38 UTC
[23/33] YARN-2080. Integrating reservation system with
ResourceManager and client-RM protocol. Contributed by Subru Krishnan and
Carlo Curino. (cherry picked from commit
8baeaead8532898163f1006276b731a237b1a559)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index db867a9..954e21d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -74,6 +75,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,7 +98,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -107,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -117,11 +130,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -1199,4 +1217,102 @@ public class TestClientRMService {
when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
return yarnScheduler;
}
+
+ @Test
+ public void testReservationAPIs() {
+ // initialize
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ ReservationSystemTestUtil.setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ MockNM nm;
+ try {
+ nm = rm.registerNode("127.0.0.1:0", 102400, 100);
+ // allow plan follower to synchronize
+ Thread.sleep(1050);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // Create a client.
+ ClientRMService clientService = rm.getClientRMService();
+
+ // create a reservation
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+ ReservationSubmissionRequest sRequest =
+ createSimpleReservationRequest(4, arrival, deadline, duration);
+ ReservationSubmissionResponse sResponse = null;
+ try {
+ sResponse = clientService.submitReservation(sRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ ReservationId reservationID = sResponse.getReservationId();
+ Assert.assertNotNull(reservationID);
+ LOG.info("Submit reservation response: " + reservationID);
+
+ // Update the reservation
+ ReservationDefinition rDef = sRequest.getReservationDefinition();
+ ReservationRequest rr =
+ rDef.getReservationRequests().getReservationResources().get(0);
+ rr.setNumContainers(5);
+ arrival = clock.getTime();
+ duration = 30000;
+ deadline = (long) (arrival + 1.05 * duration);
+ rr.setDuration(duration);
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ ReservationUpdateRequest uRequest =
+ ReservationUpdateRequest.newInstance(rDef, reservationID);
+ ReservationUpdateResponse uResponse = null;
+ try {
+ uResponse = clientService.updateReservation(uRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ LOG.info("Update reservation response: " + uResponse);
+
+ // Delete the reservation
+ ReservationDeleteRequest dRequest =
+ ReservationDeleteRequest.newInstance(reservationID);
+ ReservationDeleteResponse dResponse = null;
+ try {
+ dResponse = clientService.deleteReservation(dRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ LOG.info("Delete reservation response: " + dResponse);
+
+ // clean-up
+ rm.stop();
+ nm = null;
+ rm = null;
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationRequest(
+ int numContainers, long arrival, long deadline, long duration) {
+ // create a request with a single atomic ask
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+ ReservationRequests reqs =
+ ReservationRequests.newInstance(Collections.singletonList(r),
+ ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef =
+ ReservationDefinition.newInstance(arrival, deadline, reqs,
+ "testClientRMService#reservation");
+ ReservationSubmissionRequest request =
+ ReservationSubmissionRequest.newInstance(rDef,
+ ReservationSystemTestUtil.reservationQ);
+ return request;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
new file mode 100644
index 0000000..2a77791
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
@@ -0,0 +1,102 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCapacityReservationSystem {
+
+ @Test
+ public void testInitialize() {
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler capScheduler = null;
+ try {
+ capScheduler = testUtil.mockCapacityScheduler(10);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ CapacityReservationSystem reservationSystem =
+ new CapacityReservationSystem();
+ reservationSystem.setRMContext(capScheduler.getRMContext());
+ try {
+ reservationSystem.reinitialize(capScheduler.getConf(),
+ capScheduler.getRMContext());
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ String planQName = testUtil.getreservationQueueName();
+ Plan plan = reservationSystem.getPlan(planQName);
+ Assert.assertNotNull(plan);
+ Assert.assertTrue(plan instanceof InMemoryPlan);
+ Assert.assertEquals(planQName, plan.getQueueName());
+ Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+ Assert
+ .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+ Assert
+ .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+ }
+
+ @Test
+ public void testReinitialize() {
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler capScheduler = null;
+ try {
+ capScheduler = testUtil.mockCapacityScheduler(10);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ CapacityReservationSystem reservationSystem =
+ new CapacityReservationSystem();
+ CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
+ RMContext mockContext = capScheduler.getRMContext();
+ reservationSystem.setRMContext(mockContext);
+ try {
+ reservationSystem.reinitialize(capScheduler.getConfiguration(),
+ mockContext);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ // Assert queue in original config
+ String planQName = testUtil.getreservationQueueName();
+ Plan plan = reservationSystem.getPlan(planQName);
+ Assert.assertNotNull(plan);
+ Assert.assertTrue(plan instanceof InMemoryPlan);
+ Assert.assertEquals(planQName, plan.getQueueName());
+ Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+ Assert
+ .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+ Assert
+ .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+ // Dynamically add a plan
+ String newQ = "reservation";
+ Assert.assertNull(reservationSystem.getPlan(newQ));
+ testUtil.updateQueueConfiguration(conf, newQ);
+ try {
+ capScheduler.reinitialize(conf, mockContext);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ try {
+ reservationSystem.reinitialize(conf, mockContext);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Plan newPlan = reservationSystem.getPlan(newQ);
+ Assert.assertNotNull(newPlan);
+ Assert.assertTrue(newPlan instanceof InMemoryPlan);
+ Assert.assertEquals(newQ, newPlan.getQueueName());
+ Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
+ Assert
+ .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+ Assert
+ .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
new file mode 100644
index 0000000..f5917bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
@@ -0,0 +1,560 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationInputValidator {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestReservationInputValidator.class);
+
+ private static final String PLAN_NAME = "test-reservation";
+
+ private Clock clock;
+ private Map<String, Plan> plans = new HashMap<String, Plan>(1);
+ private ReservationSystem rSystem;
+ private Plan plan;
+
+ private ReservationInputValidator rrValidator;
+
+ @Before
+ public void setUp() {
+ clock = mock(Clock.class);
+ plan = mock(Plan.class);
+ rSystem = mock(ReservationSystem.class);
+ plans.put(PLAN_NAME, plan);
+ rrValidator = new ReservationInputValidator(clock);
+ when(clock.getTime()).thenReturn(1L);
+ ResourceCalculator rCalc = new DefaultResourceCalculator();
+ Resource resource = Resource.newInstance(10240, 10);
+ when(plan.getResourceCalculator()).thenReturn(rCalc);
+ when(plan.getTotalCapacity()).thenReturn(resource);
+ when(rSystem.getQueueForReservation(any(ReservationId.class))).thenReturn(
+ PLAN_NAME);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(plan);
+ }
+
+ @After
+ public void tearDown() {
+ rrValidator = null;
+ clock = null;
+ plan = null;
+ }
+
+ @Test
+ public void testSubmitReservationNormal() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan);
+ }
+
+ @Test
+ public void testSubmitReservationDoesnotExist() {
+ ReservationSubmissionRequest request =
+ new ReservationSubmissionRequestPBImpl();
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .equals("The queue to submit is not specified. Please try again with a valid reservable queue."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidPlan() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .endsWith(" is not managed by reservation system. Please try again with a valid reservable queue."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationNoDefinition() {
+ ReservationSubmissionRequest request =
+ new ReservationSubmissionRequestPBImpl();
+ request.setQueue(PLAN_NAME);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .equals("Missing reservation definition. Please try again by specifying a reservation definition."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidDeadline() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 0, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("The specified deadline: 0 is the past"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidRR() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(0, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationEmptyRR() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidDuration() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 3, 4);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message.startsWith("The time difference"));
+ Assert
+ .assertTrue(message
+ .contains("must be greater or equal to the minimum resource duration"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationExceedsGangSize() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 5, 4);
+ Resource resource = Resource.newInstance(512, 1);
+ when(plan.getTotalCapacity()).thenReturn(resource);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("The size of the largest gang in the reservation refinition"));
+ Assert.assertTrue(message.contains("exceed the capacity available "));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationNormal() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan);
+ }
+
+ @Test
+ public void testUpdateReservationNoID() {
+ ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("Missing reservation id. Please try again by specifying a reservation id."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationDoesnotExist() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+ ReservationId rId = request.getReservationId();
+ when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message.equals(MessageFormat
+ .format(
+ "The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
+ rId)));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidPlan() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationNoDefinition() {
+ ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+ request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("Missing reservation definition. Please try again by specifying a reservation definition."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidDeadline() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 0, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("The specified deadline: 0 is the past"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidRR() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(0, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationEmptyRR() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidDuration() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 3, 4);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .contains("must be greater or equal to the minimum resource duration"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationExceedsGangSize() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+ Resource resource = Resource.newInstance(512, 1);
+ when(plan.getTotalCapacity()).thenReturn(resource);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("The size of the largest gang in the reservation refinition"));
+ Assert.assertTrue(message.contains("exceed the capacity available "));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testDeleteReservationNormal() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ request.setReservationId(reservationID);
+ ReservationAllocation reservation = mock(ReservationAllocation.class);
+ when(plan.getReservationById(reservationID)).thenReturn(reservation);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan);
+ }
+
+ @Test
+ public void testDeleteReservationNoID() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("Missing reservation id. Please try again by specifying a reservation id."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testDeleteReservationDoesnotExist() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ ReservationId rId = ReservationSystemTestUtil.getNewReservationId();
+ request.setReservationId(rId);
+ when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message.equals(MessageFormat
+ .format(
+ "The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
+ rId)));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testDeleteReservationInvalidPlan() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ request.setReservationId(reservationID);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
+ LOG.info(message);
+ }
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
+ int numRequests, int numContainers, long arrival, long deadline,
+ long duration) {
+ // create a request with a single atomic ask
+ ReservationSubmissionRequest request =
+ new ReservationSubmissionRequestPBImpl();
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ if (numRequests > 0) {
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ rDef.setReservationRequests(reqs);
+ if (numContainers > 0) {
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+
+ reqs.setReservationResources(Collections.singletonList(r));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ }
+ }
+ request.setQueue(PLAN_NAME);
+ request.setReservationDefinition(rDef);
+ return request;
+ }
+
+ private ReservationUpdateRequest createSimpleReservationUpdateRequest(
+ int numRequests, int numContainers, long arrival, long deadline,
+ long duration) {
+ // create a request with a single atomic ask
+ ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ if (numRequests > 0) {
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ rDef.setReservationRequests(reqs);
+ if (numContainers > 0) {
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+
+ reqs.setReservationResources(Collections.singletonList(r));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ }
+ }
+ request.setReservationDefinition(rDef);
+ request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+ return request;
+ }
+
+}