You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/16 19:56:37 UTC
[18/23] hadoop git commit: YARN-1651. CapacityScheduler side changes
to support container resize. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 85d2515..8fa1ad2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -18,44 +18,51 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import static java.lang.Thread.sleep;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.lang.Thread.sleep;
public class TestApplicationMasterService {
private static final Log LOG = LogFactory
@@ -343,6 +350,92 @@ public class TestApplicationMasterService {
alloc1Response = am1.schedule();
Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size());
}
+
+ @Test(timeout=60000)
+ public void testInvalidIncreaseDecreaseRequest() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+
+ try {
+ rm.start();
+
+ // Register node1
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+ // Submit an application
+ RMApp app1 = rm.submitApp(1024);
+
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ RegisterApplicationMasterResponse registerResponse =
+ am1.registerAppAttempt();
+
+ sentRMContainerLaunched(rm,
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
+
+ // Ask for a normal increase should be successfull
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest.newInstance(
+ ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+ Resources.createResource(2048))), null);
+
+ // Target resource is negative, should fail
+ boolean exceptionCaught = false;
+ try {
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest.newInstance(
+ ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+ Resources.createResource(-1))), null);
+ } catch (InvalidResourceRequestException e) {
+ // This is expected
+ exceptionCaught = true;
+ }
+ Assert.assertTrue(exceptionCaught);
+
+ // Target resource is more than maxAllocation, should fail
+ try {
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest.newInstance(
+ ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+ Resources
+ .add(registerResponse.getMaximumResourceCapability(),
+ Resources.createResource(1)))), null);
+ } catch (InvalidResourceRequestException e) {
+ // This is expected
+ exceptionCaught = true;
+ }
+
+ Assert.assertTrue(exceptionCaught);
+
+ // Contains multiple increase/decrease requests for same contaienrId
+ try {
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest.newInstance(
+ ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+ Resources
+ .add(registerResponse.getMaximumResourceCapability(),
+ Resources.createResource(1)))), Arrays.asList(
+ ContainerResourceChangeRequest.newInstance(
+ ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+ Resources
+ .add(registerResponse.getMaximumResourceCapability(),
+ Resources.createResource(1)))));
+ } catch (InvalidResourceRequestException e) {
+ // This is expected
+ exceptionCaught = true;
+ }
+
+ Assert.assertTrue(exceptionCaught);
+ } finally {
+ if (rm != null) {
+ rm.close();
+ }
+ }
+ }
private static class MyResourceManager extends MockRM {
@@ -354,4 +447,15 @@ public class TestApplicationMasterService {
return new DrainDispatcher();
}
}
+
+ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMContainer rmContainer = cs.getRMContainer(containerId);
+ if (rmContainer != null) {
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+ } else {
+ Assert.fail("Cannot find RMContainer");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index dc843b9..168280a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@@ -331,11 +332,15 @@ public class TestAMRestart {
MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
RegisterApplicationMasterResponse registerResponse =
am2.registerAppAttempt();
- rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+ rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
// check am2 get the nm token from am1.
- Assert.assertEquals(expectedNMTokens,
- registerResponse.getNMTokensFromPreviousAttempts());
+ Assert.assertEquals(expectedNMTokens.size(),
+ registerResponse.getNMTokensFromPreviousAttempts().size());
+ for (int i = 0; i < expectedNMTokens.size(); i++) {
+ Assert.assertTrue(expectedNMTokens.get(i)
+ .equals(registerResponse.getNMTokensFromPreviousAttempts().get(i)));
+ }
// am2 allocate 1 container on nm2
containers = new ArrayList<Container>();
@@ -365,7 +370,7 @@ public class TestAMRestart {
// restart am
MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
registerResponse = am3.registerAppAttempt();
- rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+ rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
// check am3 get the NM token from both am1 and am2;
List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
@@ -430,7 +435,7 @@ public class TestAMRestart {
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
- "", ContainerExitStatus.DISKS_FAILED);
+ "", ContainerExitStatus.DISKS_FAILED, Resources.createResource(200));
currentNode.containerStatus(containerStatus);
am1.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 484a1b6..1f307aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -165,7 +165,7 @@ public class TestRMAppLogAggregationStatus {
node1ReportForApp.add(report1);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
- null, node1ReportForApp));
+ null, node1ReportForApp, null));
List<LogAggregationReport> node2ReportForApp =
new ArrayList<LogAggregationReport>();
@@ -177,7 +177,7 @@ public class TestRMAppLogAggregationStatus {
node2ReportForApp.add(report2);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
- null, node2ReportForApp));
+ null, node2ReportForApp, null));
// node1 and node2 has updated its log aggregation status
// verify that the log aggregation status for node1, node2
// has been changed
@@ -215,7 +215,7 @@ public class TestRMAppLogAggregationStatus {
node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
- null, node1ReportForApp2));
+ null, node1ReportForApp2, null));
// verify that the log aggregation status for node1
// has been changed
@@ -284,7 +284,7 @@ public class TestRMAppLogAggregationStatus {
// 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
- null, node1ReportForApp3));
+ null, node1ReportForApp3, null));
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
@@ -329,7 +329,7 @@ public class TestRMAppLogAggregationStatus {
node2ReportForApp2.add(report2_3);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
- null, node2ReportForApp2));
+ null, node2ReportForApp2, null));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 10ec453..828e149 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -465,10 +465,9 @@ public class TestRMAppAttemptTransitions {
expectedAllocateCount = 1;
}
- assertEquals(expectedState,
- applicationAttempt.getAppAttemptState());
- verify(scheduler, times(expectedAllocateCount)).
- allocate(any(ApplicationAttemptId.class),
+ assertEquals(expectedState, applicationAttempt.getAppAttemptState());
+ verify(scheduler, times(expectedAllocateCount)).allocate(
+ any(ApplicationAttemptId.class), any(List.class), any(List.class),
any(List.class), any(List.class), any(List.class), any(List.class));
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
@@ -488,11 +487,9 @@ public class TestRMAppAttemptTransitions {
assertEquals(amContainer, applicationAttempt.getMasterContainer());
// Check events
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
- verify(scheduler, times(2)).
- allocate(
- any(
- ApplicationAttemptId.class), any(List.class), any(List.class),
- any(List.class), any(List.class));
+ verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class),
+ any(List.class), any(List.class), any(List.class), any(List.class),
+ any(List.class), any(List.class));
verify(nmTokenManager).clearNodeSetForAttempt(
applicationAttempt.getAppAttemptId());
}
@@ -641,13 +638,9 @@ public class TestRMAppAttemptTransitions {
Allocation allocation = mock(Allocation.class);
when(allocation.getContainers()).
thenReturn(Collections.singletonList(container));
- when(
- scheduler.allocate(
- any(ApplicationAttemptId.class),
- any(List.class),
- any(List.class),
- any(List.class),
- any(List.class))).
+ when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
+ any(List.class), any(List.class), any(List.class), any(List.class),
+ any(List.class))).
thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(container.getId())).
@@ -1511,10 +1504,9 @@ public class TestRMAppAttemptTransitions {
@Test
public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() {
YarnScheduler mockScheduler = mock(YarnScheduler.class);
- when(
- mockScheduler.allocate(any(ApplicationAttemptId.class),
- any(List.class), any(List.class), any(List.class), any(List.class)))
- .thenAnswer(new Answer<Allocation>() {
+ when(mockScheduler.allocate(any(ApplicationAttemptId.class),
+ any(List.class), any(List.class), any(List.class), any(List.class),
+ any(List.class), any(List.class))).thenAnswer(new Answer<Allocation>() {
@SuppressWarnings("rawtypes")
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index e4e2049..415e891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -191,6 +190,10 @@ public class TestRMContainerImpl {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
+
+ ConcurrentMap<ApplicationId, RMApp> appMap = new ConcurrentHashMap<>();
+ RMApp rmApp = mock(RMApp.class);
+ appMap.putIfAbsent(appId, rmApp);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
@@ -200,6 +203,7 @@ public class TestRMContainerImpl {
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
+ when(rmContext.getRMApps()).thenReturn(appMap);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext);
@@ -235,11 +239,118 @@ public class TestRMContainerImpl {
rmContainer.handle(new RMContainerFinishedEvent(containerId,
containerStatus, RMContainerEventType.EXPIRE));
drainDispatcher.await();
+ assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
+ verify(writer, times(1)).containerFinished(any(RMContainer.class));
+ verify(publisher, times(1)).containerFinished(any(RMContainer.class),
+ anyLong());
+ }
+
+ private void testExpireAfterIncreased(boolean acquired) {
+ /*
+ * Similar to previous test, a container is increased but not acquired by
+ * AM. In this case, if a container is expired, the container should be
+ * finished.
+ */
+ DrainDispatcher drainDispatcher = new DrainDispatcher();
+ EventHandler<RMAppAttemptEvent> appAttemptEventHandler =
+ mock(EventHandler.class);
+ EventHandler generic = mock(EventHandler.class);
+ drainDispatcher.register(RMAppAttemptEventType.class,
+ appAttemptEventHandler);
+ drainDispatcher.register(RMNodeEventType.class, generic);
+ drainDispatcher.init(new YarnConfiguration());
+ drainDispatcher.start();
+ NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 1);
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+ ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
+
+ Resource resource = BuilderUtils.newResource(512, 1);
+ Priority priority = BuilderUtils.newPriority(5);
+
+ Container container = BuilderUtils.newContainer(containerId, nodeId,
+ "host:3465", resource, priority, null);
+
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+ SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
+ when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+ when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
+ ConcurrentMap<ApplicationId, RMApp> apps =
+ new ConcurrentHashMap<ApplicationId, RMApp>();
+ apps.put(appId, mock(RMApp.class));
+ when(rmContext.getRMApps()).thenReturn(apps);
+ RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ nodeId, "user", rmContext);
+
+ assertEquals(RMContainerState.NEW, rmContainer.getState());
+ assertEquals(resource, rmContainer.getAllocatedResource());
+ assertEquals(nodeId, rmContainer.getAllocatedNode());
+ assertEquals(priority, rmContainer.getAllocatedPriority());
+ verify(writer).containerStarted(any(RMContainer.class));
+ verify(publisher).containerCreated(any(RMContainer.class), anyLong());
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.START));
+ drainDispatcher.await();
+ assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.ACQUIRED));
+ drainDispatcher.await();
+ assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.LAUNCHED));
+ drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
- verify(writer, never()).containerFinished(any(RMContainer.class));
- verify(publisher, never()).containerFinished(any(RMContainer.class),
+ assertEquals(
+ "http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
+ rmContainer.getLogURL());
+
+ // newResource is more than the old resource
+ Resource newResource = BuilderUtils.newResource(1024, 2);
+ rmContainer.handle(new RMContainerChangeResourceEvent(containerId,
+ newResource, true));
+
+ if (acquired) {
+ rmContainer
+ .handle(new RMContainerUpdatesAcquiredEvent(containerId, true));
+ drainDispatcher.await();
+ // status is still RUNNING since this is a increased container acquired by
+ // AM
+ assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+ }
+
+ // In RUNNING state. Verify EXPIRE and associated actions.
+ reset(appAttemptEventHandler);
+ ContainerStatus containerStatus = SchedulerUtils
+ .createAbnormalContainerStatus(containerId,
+ SchedulerUtils.EXPIRED_CONTAINER);
+ rmContainer.handle(new RMContainerFinishedEvent(containerId,
+ containerStatus, RMContainerEventType.EXPIRE));
+ drainDispatcher.await();
+ assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
+
+ // Container will be finished only when it is acquired by AM after increase,
+ // we will only notify expirer when it is acquired by AM.
+ verify(writer, times(1)).containerFinished(any(RMContainer.class));
+ verify(publisher, times(1)).containerFinished(any(RMContainer.class),
anyLong());
}
+
+ @Test
+ public void testExpireAfterContainerResourceIncreased() throws Exception {
+ // expire after increased and acquired by AM
+ testExpireAfterIncreased(true);
+ // expire after increased but not acquired by AM
+ testExpireAfterIncreased(false);
+ }
@Test
public void testExistenceOfResourceRequestInRMContainer() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 88c1444..7f6a749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -31,7 +31,6 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -103,6 +103,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -139,7 +141,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -678,11 +679,11 @@ public class TestCapacityScheduler {
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
- Collections.singletonList(host), null);
+ Collections.singletonList(host), null, null, null);
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
- Collections.singletonList(host));
+ Collections.singletonList(host), null, null);
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
rm.stop();
}
@@ -777,7 +778,7 @@ public class TestCapacityScheduler {
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
- null, null);
+ null, null, null, null);
//And this will result in container assignment for app1
CapacityScheduler.schedule(cs);
@@ -794,7 +795,7 @@ public class TestCapacityScheduler {
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(),
- null, null);
+ null, null, null, null);
//In this case we do not perform container assignment because we want to
//verify re-ordering based on the allocation alone
@@ -2907,7 +2908,7 @@ public class TestCapacityScheduler {
Allocation allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
- Collections.<ContainerId> emptyList(), null, null);
+ Collections.<ContainerId> emptyList(), null, null, null, null);
Assert.assertNotNull(attempt);
@@ -2923,7 +2924,7 @@ public class TestCapacityScheduler {
allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
- Collections.<ContainerId> emptyList(), null, null);
+ Collections.<ContainerId> emptyList(), null, null, null, null);
// All resources should be sent as headroom
Assert.assertEquals(newResource, allocate.getResourceLimit());
@@ -3084,7 +3085,107 @@ public class TestCapacityScheduler {
config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
verifyAMLimitForLeafQueue(config);
+ }
+
+ private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
+ ApplicationId appId) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+ }
+ @Test
+ public void testPendingResourceUpdatedAccordingToIncreaseRequestChanges()
+ throws Exception {
+ Configuration conf =
+ TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm = new MockRM(conf, memStore) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.start();
+
+ MockNM nm1 = // label = ""
+ new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ // Launch app1 in queue=a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // Allocate two more containers
+ am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(2 * GB), 2)),
+ null);
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ ContainerId containerId2 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ ContainerId containerId3 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ Assert.assertTrue(rm.waitForState(nm1, containerId3,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Acquire them
+ am1.allocate(null, null);
+ sentRMContainerLaunched(rm,
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1L));
+ sentRMContainerLaunched(rm,
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L));
+ sentRMContainerLaunched(rm,
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3L));
+
+ // am1 asks to change its AM container from 1GB to 3GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(3 * GB))),
+ null);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
+
+ Assert.assertEquals(2 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ checkPendingResource(rm, "a1", 2 * GB, null);
+ checkPendingResource(rm, "a", 2 * GB, null);
+ checkPendingResource(rm, "root", 2 * GB, null);
+
+ // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G)
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId2, Resources.createResource(3 * GB)),
+ ContainerResourceChangeRequest
+ .newInstance(containerId3, Resources.createResource(5 * GB))),
+ null);
+
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ checkPendingResource(rm, "a1", 6 * GB, null);
+ checkPendingResource(rm, "a", 6 * GB, null);
+ checkPendingResource(rm, "root", 6 * GB, null);
+
+ // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and
+ // containerId3 (2G -> 2G)
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(3 * GB)),
+ ContainerResourceChangeRequest
+ .newInstance(containerId2, Resources.createResource(4 * GB)),
+ ContainerResourceChangeRequest
+ .newInstance(containerId3, Resources.createResource(2 * GB))),
+ null);
+ Assert.assertEquals(4 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ checkPendingResource(rm, "a1", 4 * GB, null);
+ checkPendingResource(rm, "a", 4 * GB, null);
+ checkPendingResource(rm, "root", 4 * GB, null);
}
private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config)
@@ -3146,4 +3247,15 @@ public class TestCapacityScheduler {
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
conf.setInt(propName, maxAllocVcores);
}
+
+ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMContainer rmContainer = cs.getRMContainer(containerId);
+ if (rmContainer != null) {
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+ } else {
+ Assert.fail("Cannot find RMContainer");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 9dcab2e..88c7c13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -132,11 +132,11 @@ public class TestChildQueueOrder {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, RMNodeLabelsManager.NO_LABEL);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL, false);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource, null, null);
+ allocatedResource, null, null, false);
}
// Next call - nothing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 769041b..b5b2222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -60,9 +59,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
public class TestContainerAllocation {
@@ -199,13 +195,16 @@ public class TestContainerAllocation {
// acquire the container.
SecurityUtilTestHelper.setTokenServiceUseIp(true);
- List<Container> containers =
- am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
- // not able to fetch the container;
- Assert.assertEquals(0, containers.size());
-
- SecurityUtilTestHelper.setTokenServiceUseIp(false);
+ List<Container> containers;
+ try {
+ containers =
+ am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ // not able to fetch the container;
+ Assert.assertEquals(0, containers.size());
+ } finally {
+ SecurityUtilTestHelper.setTokenServiceUseIp(false);
+ }
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
@@ -315,21 +314,24 @@ public class TestContainerAllocation {
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
- SecurityUtilTestHelper.setTokenServiceUseIp(true);
- RMApp app1 = rm1.submitApp(200);
- RMAppAttempt attempt = app1.getCurrentAppAttempt();
- nm1.nodeHeartbeat(true);
-
- // fetching am container will fail, keep retrying 5 times.
- while (numRetries <= 5) {
+ RMApp app1;
+ try {
+ SecurityUtilTestHelper.setTokenServiceUseIp(true);
+ app1 = rm1.submitApp(200);
+ RMAppAttempt attempt = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
- Thread.sleep(1000);
- Assert.assertEquals(RMAppAttemptState.SCHEDULED,
- attempt.getAppAttemptState());
- System.out.println("Waiting for am container to be allocated.");
- }
- SecurityUtilTestHelper.setTokenServiceUseIp(false);
+ // fetching am container will fail, keep retrying 5 times.
+ while (numRetries <= 5) {
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(1000);
+ Assert.assertEquals(RMAppAttemptState.SCHEDULED,
+ attempt.getAppAttemptState());
+ System.out.println("Waiting for am container to be allocated.");
+ }
+ } finally {
+ SecurityUtilTestHelper.setTokenServiceUseIp(false);
+ }
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
new file mode 100644
index 0000000..23283f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -0,0 +1,963 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestContainerResizing {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @Test
+ public void testSimpleIncreaseContainer() throws Exception {
+ /**
+ * Application has a container running, and the node has enough available
+ * resource. Add a increase request to see if container will be increased
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ sentRMContainerLaunched(rm1, containerId1);
+ // am1 asks to change its AM container from 1GB to 3GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(3 * GB))),
+ null);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ checkPendingResource(rm1, "default", 2 * GB, null);
+ Assert.assertEquals(2 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // NM1 do 1 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // Pending resource should be deducted
+ checkPendingResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ verifyContainerIncreased(am1.allocate(null, null), containerId1, 3 * GB);
+ verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 17 * GB);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testSimpleDecreaseContainer() throws Exception {
+ /**
+ * Application has a container running, try to decrease the container and
+ * check queue's usage and container resource will be updated.
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ checkUsedResource(rm1, "default", 3 * GB, null);
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ sentRMContainerLaunched(rm1, containerId1);
+
+ // am1 asks to change its AM container from 1GB to 3GB
+ AllocateResponse response = am1.sendContainerResizingRequest(null, Arrays
+ .asList(ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(1 * GB))));
+
+ verifyContainerDecreased(response, containerId1, 1 * GB);
+ checkUsedResource(rm1, "default", 1 * GB, null);
+ Assert.assertEquals(1 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ // Check if decreased containers added to RMNode
+ RMNodeImpl rmNode =
+ (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ Collection<Container> decreasedContainers =
+ rmNode.getToBeDecreasedContainers();
+ boolean rmNodeReceivedDecreaseContainer = false;
+ for (Container c : decreasedContainers) {
+ if (c.getId().equals(containerId1)
+ && c.getResource().equals(Resources.createResource(1 * GB))) {
+ rmNodeReceivedDecreaseContainer = true;
+ }
+ }
+ Assert.assertTrue(rmNodeReceivedDecreaseContainer);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testSimpleIncreaseRequestReservation() throws Exception {
+ /**
+ * Application has two containers running, try to increase one of then, node
+ * doesn't have enough resource, so the increase request will be reserved.
+ * Check resource usage after container reserved, finish a container, the
+ * reserved container should be allocated.
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ // Allocate two more containers
+ am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(2 * GB), 1)),
+ null);
+ ContainerId containerId2 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Acquire them, and NM report RUNNING
+ am1.allocate(null, null);
+ sentRMContainerLaunched(rm1, containerId2);
+
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ sentRMContainerLaunched(rm1, containerId1);
+
+
+ // am1 asks to change its AM container from 1GB to 3GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(7 * GB))),
+ null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // NM1 do 1 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
+
+ /* Check reservation statuses */
+ // Increase request should be reserved
+ Assert.assertTrue(rmContainer1.hasIncreaseReservation());
+ Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory());
+ Assert.assertFalse(app.getReservedContainers().isEmpty());
+ Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ // Pending resource will not be changed since it's not satisfied
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 9 * GB, null);
+ Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+ // Complete one container and do another allocation
+ am1.allocate(null, Arrays.asList(containerId2));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // Now container should be increased
+ verifyContainerIncreased(am1.allocate(null, null), containerId1, 7 * GB);
+
+ /* Check statuses after reservation satisfied */
+ // Increase request should be unreserved
+ Assert.assertFalse(rmContainer1.hasIncreaseReservation());
+ Assert.assertTrue(app.getReservedContainers().isEmpty());
+ Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ // Pending resource will be changed since it's satisfied
+ checkPendingResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 7 * GB, null);
+ Assert.assertEquals(7 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(7 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 1 * GB);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testExcessiveReservationWhenCancelIncreaseRequest()
+ throws Exception {
+ /**
+ * Application has two containers running, try to increase one of then, node
+ * doesn't have enough resource, so the increase request will be reserved.
+ * Check resource usage after container reserved, finish a container &
+ * cancel the increase request, reservation should be cancelled
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ // Allocate two more containers
+ am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(2 * GB), 1)),
+ null);
+ ContainerId containerId2 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Acquire them, and NM report RUNNING
+ am1.allocate(null, null);
+ sentRMContainerLaunched(rm1, containerId2);
+
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ sentRMContainerLaunched(rm1, containerId1);
+
+ // am1 asks to change its AM container from 1GB to 3GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(7 * GB))),
+ null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // NM1 do 1 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
+
+ /* Check reservation statuses */
+ // Increase request should be reserved
+ Assert.assertTrue(rmContainer1.hasIncreaseReservation());
+ Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory());
+ Assert.assertFalse(app.getReservedContainers().isEmpty());
+ Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ // Pending resource will not be changed since it's not satisfied
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 9 * GB, null);
+ Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+ // Complete one container and cancel increase request (via send a increase
+ // request, make target_capacity=existing_capacity)
+ am1.allocate(null, Arrays.asList(containerId2));
+ // am1 asks to change its AM container from 1G to 1G (cancel the increase
+ // request actually)
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(1 * GB))),
+ null);
+ // Trigger a node heartbeat..
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ /* Check statuses after reservation satisfied */
+ // Increase request should be unreserved
+ Assert.assertTrue(app.getReservedContainers().isEmpty());
+ Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ Assert.assertFalse(rmContainer1.hasIncreaseReservation());
+ // Pending resource will be changed since it's satisfied
+ checkPendingResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 1 * GB, null);
+ Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(1 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testExcessiveReservationWhenDecreaseSameContainer()
+ throws Exception {
+ /**
+ * Very similar to testExcessiveReservationWhenCancelIncreaseRequest, after
+ * the increase request reserved, it decreases the reserved container,
+ * container should be decreased and reservation will be cancelled
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ // Allocate two more containers
+ am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(2 * GB), 1)),
+ null);
+ ContainerId containerId2 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Acquire them, and NM report RUNNING
+ am1.allocate(null, null);
+ sentRMContainerLaunched(rm1, containerId2);
+
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ sentRMContainerLaunched(rm1, containerId1);
+
+
+ // am1 asks to change its AM container from 2GB to 8GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(8 * GB))),
+ null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // NM1 do 1 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
+
+ /* Check reservation statuses */
+ // Increase request should be reserved
+ Assert.assertTrue(rmContainer1.hasIncreaseReservation());
+ Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory());
+ Assert.assertFalse(app.getReservedContainers().isEmpty());
+ Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ // Pending resource will not be changed since it's not satisfied
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 10 * GB, null);
+ Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(4 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+ // Complete one container and cancel increase request (via send a increase
+ // request, make target_capacity=existing_capacity)
+ am1.allocate(null, Arrays.asList(containerId2));
+ // am1 asks to change its AM container from 2G to 1G (decrease)
+ am1.sendContainerResizingRequest(null, Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(1 * GB))));
+ // Trigger a node heartbeat..
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ /* Check statuses after reservation satisfied */
+ // Increase request should be unreserved
+ Assert.assertTrue(app.getReservedContainers().isEmpty());
+ Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ Assert.assertFalse(rmContainer1.hasIncreaseReservation());
+ // Pending resource will be changed since it's satisfied
+ checkPendingResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 1 * GB, null);
+ Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(1 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testIncreaseContainerUnreservedWhenContainerCompleted()
+ throws Exception {
+ /**
+ * App has two containers on the same node (node.resource = 8G), container1
+ * = 2G, container2 = 2G. App asks to increase container2 to 8G.
+ *
+ * So increase container request will be reserved. When app releases
+ * container2, reserved part should be released as well.
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ // Allocate two more containers
+ am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(2 * GB), 1)),
+ null);
+ ContainerId containerId2 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Acquire them, and NM report RUNNING
+ am1.allocate(null, null);
+ sentRMContainerLaunched(rm1, containerId2);
+ rm1.waitForContainerState(containerId2, RMContainerState.RUNNING);
+
+ // am1 asks to change its AM container from 2GB to 8GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId2, Resources.createResource(8 * GB))),
+ null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // NM1 do 1 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
+
+ /* Check reservation statuses */
+ // Increase request should be reserved
+ Assert.assertTrue(rmContainer2.hasIncreaseReservation());
+ Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemory());
+ Assert.assertFalse(app.getReservedContainers().isEmpty());
+ Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ // Pending resource will not be changed since it's not satisfied
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 9 * GB, null);
+ Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+ // Complete container2, container will be unreserved and completed
+ am1.allocate(null, Arrays.asList(containerId2));
+
+ /* Check statuses after reservation satisfied */
+ // Increase request should be unreserved
+ Assert.assertTrue(app.getReservedContainers().isEmpty());
+ Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ Assert.assertFalse(rmContainer2.hasIncreaseReservation());
+ // Pending resource will be changed since it's satisfied
+ checkPendingResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 1 * GB, null);
+ Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(1 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testIncreaseContainerUnreservedWhenApplicationCompleted()
+ throws Exception {
+ /**
+ * Similar to testIncreaseContainerUnreservedWhenContainerCompleted, when
+ * application finishes, reserved increase container should be cancelled
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+ // Allocate two more containers
+ am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(2 * GB), 1)),
+ null);
+ ContainerId containerId2 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(
+ rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED,
+ 10 * 1000));
+ // Acquire them, and NM report RUNNING
+ am1.allocate(null, null);
+ sentRMContainerLaunched(rm1, containerId2);
+
+ // am1 asks to change its AM container from 2GB to 8GB
+ am1.sendContainerResizingRequest(Arrays.asList(
+ ContainerResourceChangeRequest
+ .newInstance(containerId2, Resources.createResource(8 * GB))),
+ null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // NM1 do 1 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
+
+ /* Check reservation statuses */
+ // Increase request should be reserved
+ Assert.assertTrue(rmContainer2.hasIncreaseReservation());
+ Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemory());
+ Assert.assertFalse(app.getReservedContainers().isEmpty());
+ Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ // Pending resource will not be changed since it's not satisfied
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 9 * GB, null);
+ Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+ // Kill the application
+ cs.handle(new AppAttemptRemovedSchedulerEvent(am1.getApplicationAttemptId(),
+ RMAppAttemptState.KILLED, false));
+
+ /* Check statuses after reservation satisfied */
+ // Increase request should be unreserved
+ Assert.assertTrue(app.getReservedContainers().isEmpty());
+ Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ Assert.assertFalse(rmContainer2.hasIncreaseReservation());
+ // Pending resource will be changed since it's satisfied
+ checkPendingResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 0 * GB, null);
+ Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ rm1.close();
+ }
+
+ private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
+ int nContainer, int mem, int priority, int startContainerId)
+ throws Exception {
+ am.allocate(Arrays
+ .asList(ResourceRequest.newInstance(Priority.newInstance(priority), "*",
+ Resources.createResource(mem), nContainer)),
+ null);
+ ContainerId lastContainerId = ContainerId.newContainerId(
+ am.getApplicationAttemptId(), startContainerId + nContainer - 1);
+ Assert.assertTrue(rm.waitForState(nm, lastContainerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Acquire them, and NM report RUNNING
+ am.allocate(null, null);
+
+ for (int cId = startContainerId; cId < startContainerId
+ + nContainer; cId++) {
+ sentRMContainerLaunched(rm,
+ ContainerId.newContainerId(am.getApplicationAttemptId(), cId));
+ rm.waitForContainerState(
+ ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
+ RMContainerState.RUNNING);
+ }
+ }
+
+ @Test
+ public void testOrderOfIncreaseContainerRequestAllocation()
+ throws Exception {
+ /**
+ * There're multiple containers need to be increased, check container will
+ * be increase sorted by priority, if priority is same, smaller containerId
+ * container will get preferred
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+ ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
+
+ // Container 2, 3 (priority=3)
+ allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
+
+ // Container 4, 5 (priority=2)
+ allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
+
+ // Container 6, 7 (priority=4)
+ allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
+
+ // am1 asks to change its container[2-7] from 1G to 2G
+ List<ContainerResourceChangeRequest> increaseRequests = new ArrayList<>();
+ for (int cId = 2; cId <= 7; cId++) {
+ ContainerId containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
+ increaseRequests.add(ContainerResourceChangeRequest
+ .newInstance(containerId, Resources.createResource(2 * GB)));
+ }
+ am1.sendContainerResizingRequest(increaseRequests, null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // Get rmNode1
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // assignContainer, container-4/5/2 increased (which has highest priority OR
+ // earlier allocated)
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ AllocateResponse allocateResponse = am1.allocate(null, null);
+ Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size());
+ verifyContainerIncreased(allocateResponse,
+ ContainerId.newContainerId(attemptId, 4), 2 * GB);
+ verifyContainerIncreased(allocateResponse,
+ ContainerId.newContainerId(attemptId, 5), 2 * GB);
+ verifyContainerIncreased(allocateResponse,
+ ContainerId.newContainerId(attemptId, 2), 2 * GB);
+
+ /* Check statuses after allocation */
+ // There're still 3 pending increase requests
+ checkPendingResource(rm1, "default", 3 * GB, null);
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 10 * GB, null);
+ Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(10 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testIncreaseContainerRequestGetPreferrence()
+ throws Exception {
+ /**
+ * There're multiple containers need to be increased, and there're several
+ * container allocation request, scheduler will try to increase container
+ * before allocate new containers
+ */
+ MockRM rm1 = new MockRM() {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+ ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
+
+ // Container 2, 3 (priority=3)
+ allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
+
+ // Container 4, 5 (priority=2)
+ allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
+
+ // Container 6, 7 (priority=4)
+ allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
+
+ // am1 asks to change its container[2-7] from 1G to 2G
+ List<ContainerResourceChangeRequest> increaseRequests = new ArrayList<>();
+ for (int cId = 2; cId <= 7; cId++) {
+ ContainerId containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
+ increaseRequests.add(ContainerResourceChangeRequest
+ .newInstance(containerId, Resources.createResource(2 * GB)));
+ }
+ am1.sendContainerResizingRequest(increaseRequests, null);
+
+ checkPendingResource(rm1, "default", 6 * GB, null);
+ Assert.assertEquals(6 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+
+ // Get rmNode1
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // assignContainer, container-4/5/2 increased (which has highest priority OR
+ // earlier allocated)
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ AllocateResponse allocateResponse = am1.allocate(null, null);
+ Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size());
+ verifyContainerIncreased(allocateResponse,
+ ContainerId.newContainerId(attemptId, 4), 2 * GB);
+ verifyContainerIncreased(allocateResponse,
+ ContainerId.newContainerId(attemptId, 5), 2 * GB);
+ verifyContainerIncreased(allocateResponse,
+ ContainerId.newContainerId(attemptId, 2), 2 * GB);
+
+ /* Check statuses after allocation */
+ // There're still 3 pending increase requests
+ checkPendingResource(rm1, "default", 3 * GB, null);
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getPending().getMemory());
+ // Queue/user/application's usage will be updated
+ checkUsedResource(rm1, "default", 10 * GB, null);
+ Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
+ .getUser("user").getUsed().getMemory());
+ Assert.assertEquals(0 * GB,
+ app.getAppAttemptResourceUsage().getReserved().getMemory());
+ Assert.assertEquals(10 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+ rm1.close();
+ }
+
+ private void checkPendingResource(MockRM rm, String queueName, int memory,
+ String label) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertEquals(memory,
+ queue.getQueueResourceUsage()
+ .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+ .getMemory());
+ }
+
+ private void checkUsedResource(MockRM rm, String queueName, int memory,
+ String label) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertEquals(memory,
+ queue.getQueueResourceUsage()
+ .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+ .getMemory());
+ }
+
+ private void verifyContainerIncreased(AllocateResponse response,
+ ContainerId containerId, int mem) {
+ List<Container> increasedContainers = response.getIncreasedContainers();
+ boolean found = false;
+ for (Container c : increasedContainers) {
+ if (c.getId().equals(containerId)) {
+ found = true;
+ Assert.assertEquals(mem, c.getResource().getMemory());
+ }
+ }
+ if (!found) {
+ Assert.fail("Container not increased: containerId=" + containerId);
+ }
+ }
+
+ private void verifyContainerDecreased(AllocateResponse response,
+ ContainerId containerId, int mem) {
+ List<Container> decreasedContainers = response.getDecreasedContainers();
+ boolean found = false;
+ for (Container c : decreasedContainers) {
+ if (c.getId().equals(containerId)) {
+ found = true;
+ Assert.assertEquals(mem, c.getResource().getMemory());
+ }
+ }
+ if (!found) {
+ Assert.fail("Container not decreased: containerId=" + containerId);
+ }
+ }
+
+ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMContainer rmContainer = cs.getRMContainer(containerId);
+ if (rmContainer != null) {
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+ } else {
+ Assert.fail("Cannot find RMContainer");
+ }
+ }
+
+ private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId,
+ int expectedMemory) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ SchedulerNode node = cs.getNode(nodeId);
+ Assert
+ .assertEquals(expectedMemory, node.getAvailableResource().getMemory());
+ }
+
+ private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
+ ApplicationId appId) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index fe8be06..b85c697 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -770,9 +770,9 @@ public class TestLeafQueue {
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority),
- null, null);
+ null, null, false);
qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority),
- null, null);
+ null, null, false);
qb.setUserLimit(50);
qb.setUserLimitFactor(1);