You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/08/16 00:28:35 UTC
[41/50] [abbrv] hadoop git commit: YARN-5978. ContainerScheduler and
ContainerManager changes to support ExecType update. (Kartheek Muthyala via
asuresh)
YARN-5978. ContainerScheduler and ContainerManager changes to support ExecType update. (Kartheek Muthyala via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d7be1d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d7be1d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d7be1d8
Branch: refs/heads/YARN-6592
Commit: 4d7be1d8575e9254c59d41460960708e3718503a
Parents: 0446511
Author: Arun Suresh <as...@apache.org>
Authored: Mon Aug 14 19:46:17 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Mon Aug 14 19:46:17 2017 -0700
----------------------------------------------------------------------
.../yarn/client/api/impl/TestAMRMClient.java | 395 +++++++++++++++++--
.../yarn/client/api/impl/TestNMClient.java | 7 +-
.../containermanager/ContainerManagerImpl.java | 132 ++++---
.../containermanager/container/Container.java | 4 +-
.../container/ContainerImpl.java | 37 +-
.../monitor/ContainersMonitorImpl.java | 15 -
.../scheduler/ContainerScheduler.java | 73 ++++
.../scheduler/ContainerSchedulerEventType.java | 1 +
.../UpdateContainerSchedulerEvent.java | 85 ++++
.../nodemanager/TestNodeManagerResync.java | 11 +-
.../BaseContainerManagerTest.java | 33 +-
.../containermanager/TestContainerManager.java | 267 ++++++++-----
.../TestContainerManagerRecovery.java | 2 +-
.../TestContainerSchedulerQueuing.java | 96 +++++
.../nodemanager/webapp/MockContainer.java | 2 +-
.../scheduler/SchedulerApplicationAttempt.java | 2 +-
.../security/RMContainerTokenSecretManager.java | 30 +-
17 files changed, 964 insertions(+), 228 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 1b2bca3..09b12f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -142,6 +144,10 @@ public class TestAMRMClient {
// set the minimum allocation so that resource decrease can go under 1024
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
@@ -924,8 +930,8 @@ public class TestAMRMClient {
// add exp=x to ANY
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "x"));
- Assert.assertEquals(1, client.ask.size());
- Assert.assertEquals("x", client.ask.iterator().next()
+ assertEquals(1, client.ask.size());
+ assertEquals("x", client.ask.iterator().next()
.getNodeLabelExpression());
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
@@ -933,8 +939,8 @@ public class TestAMRMClient {
1), null, null, Priority.UNDEFINED, true, "x"));
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "a"));
- Assert.assertEquals(1, client.ask.size());
- Assert.assertEquals("a", client.ask.iterator().next()
+ assertEquals(1, client.ask.size());
+ assertEquals("a", client.ask.iterator().next()
.getNodeLabelExpression());
// add exp=x to ANY, rack and node, only resource request has ANY resource
@@ -943,10 +949,10 @@ public class TestAMRMClient {
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true,
"y"));
- Assert.assertEquals(1, client.ask.size());
+ assertEquals(1, client.ask.size());
for (ResourceRequest req : client.ask) {
if (ResourceRequest.ANY.equals(req.getResourceName())) {
- Assert.assertEquals("y", req.getNodeLabelExpression());
+ assertEquals("y", req.getNodeLabelExpression());
} else {
Assert.assertNull(req.getNodeLabelExpression());
}
@@ -957,7 +963,7 @@ public class TestAMRMClient {
new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y"));
for (ResourceRequest req : client.ask) {
if (ResourceRequest.ANY.equals(req.getResourceName())) {
- Assert.assertEquals("y", req.getNodeLabelExpression());
+ assertEquals("y", req.getNodeLabelExpression());
} else {
Assert.assertNull(req.getNodeLabelExpression());
}
@@ -971,7 +977,7 @@ public class TestAMRMClient {
} catch (InvalidContainerRequestException e) {
return;
}
- Assert.fail();
+ fail();
}
@Test(timeout=30000)
@@ -1042,7 +1048,8 @@ public class TestAMRMClient {
// get allocations
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<Container> containers = allocResponse.getAllocatedContainers();
- Assert.assertEquals(num, containers.size());
+ assertEquals(num, containers.size());
+
// build container launch context
Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
@@ -1083,14 +1090,14 @@ public class TestAMRMClient {
private void doContainerResourceChange(
final AMRMClient<ContainerRequest> amClient, List<Container> containers)
throws YarnException, IOException {
- Assert.assertEquals(3, containers.size());
+ assertEquals(3, containers.size());
// remember the container IDs
Container container1 = containers.get(0);
Container container2 = containers.get(1);
Container container3 = containers.get(2);
AMRMClientImpl<ContainerRequest> amClientImpl =
(AMRMClientImpl<ContainerRequest>) amClient;
- Assert.assertEquals(0, amClientImpl.change.size());
+ assertEquals(0, amClientImpl.change.size());
// verify newer request overwrites older request for the container1
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
@@ -1100,21 +1107,21 @@ public class TestAMRMClient {
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(4096, 1), null));
- Assert.assertEquals(Resource.newInstance(4096, 1),
+ assertEquals(Resource.newInstance(4096, 1),
amClientImpl.change.get(container1.getId()).getValue().getCapability());
// verify new decrease request cancels old increase request for container1
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
Resource.newInstance(512, 1), null));
- Assert.assertEquals(Resource.newInstance(512, 1),
+ assertEquals(Resource.newInstance(512, 1),
amClientImpl.change.get(container1.getId()).getValue().getCapability());
// request resource increase for container2
amClientImpl.requestContainerUpdate(container2,
UpdateContainerRequest.newInstance(container2.getVersion(),
container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
- Assert.assertEquals(Resource.newInstance(2048, 1),
+ assertEquals(Resource.newInstance(2048, 1),
amClientImpl.change.get(container2.getId()).getValue().getCapability());
// verify release request will cancel pending change requests for the same
// container
@@ -1122,27 +1129,357 @@ public class TestAMRMClient {
UpdateContainerRequest.newInstance(container3.getVersion(),
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
- Assert.assertEquals(3, amClientImpl.pendingChange.size());
+ assertEquals(3, amClientImpl.pendingChange.size());
amClientImpl.releaseAssignedContainer(container3.getId());
- Assert.assertEquals(2, amClientImpl.pendingChange.size());
+ assertEquals(2, amClientImpl.pendingChange.size());
// as of now: container1 asks to decrease to (512, 1)
// container2 asks to increase to (2048, 1)
// send allocation requests
AllocateResponse allocResponse = amClient.allocate(0.1f);
- Assert.assertEquals(0, amClientImpl.change.size());
+ assertEquals(0, amClientImpl.change.size());
// we should get decrease confirmation right away
List<UpdatedContainer> updatedContainers =
allocResponse.getUpdatedContainers();
- Assert.assertEquals(1, updatedContainers.size());
+ assertEquals(1, updatedContainers.size());
// we should get increase allocation after the next NM's heartbeat to RM
triggerSchedulingWithNMHeartBeat();
// get allocations
allocResponse = amClient.allocate(0.1f);
updatedContainers =
allocResponse.getUpdatedContainers();
- Assert.assertEquals(1, updatedContainers.size());
+ assertEquals(1, updatedContainers.size());
+ }
+
+ @Test(timeout=60000)
+ public void testAMRMClientWithContainerPromotion()
+ throws YarnException, IOException {
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+ (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
+ .createAMRMClient();
+ //asserting we are not using the singleton instance cache
+ Assert.assertSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+ amClient.init(conf);
+ amClient.start();
+
+ // start am nm client
+ NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
+ Assert.assertNotNull(nmClient);
+ // asserting we are using the singleton instance cache
+ Assert.assertSame(
+ NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
+ nmClient.init(conf);
+ nmClient.start();
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // START OPPORTUNISTIC Container, Send allocation request to RM
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null, ExecutionTypeRequest
+ .newInstance(ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(1, oppContainersRequestedAny);
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+
+ AllocateResponse allocResponse = null;
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ allocResponse = amClient.allocate(0.1f);
+ // let NM heartbeat to RM and trigger allocations
+ //triggerSchedulingWithNMHeartBeat();
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainers.put(container.getId(), container);
+ }
+ }
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+ startContainer(allocResponse, nmClient);
+
+ // SEND PROMOTION REQUEST TO RM
+ try {
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ assertTrue(e.getMessage().contains(
+ "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+ }
+
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ iterationsLeft = 120;
+ Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(1, updatedContainers.size());
+
+ for (ContainerId cId : allocatedOpportContainers.keySet()) {
+ Container orig = allocatedOpportContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.GUARANTEED,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // SEND UPDATE EXECTYPE UPDATE TO NM
+ updateContainerExecType(allocResponse, ExecutionType.GUARANTEED, nmClient);
+
+ amClient.ask.clear();
+ }
+
+ @Test(timeout=60000)
+ public void testAMRMClientWithContainerDemotion()
+ throws YarnException, IOException {
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+ (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
+ .createAMRMClient();
+ //asserting we are not using the singleton instance cache
+ Assert.assertSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+ amClient.init(conf);
+ amClient.start();
+
+ NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
+ Assert.assertNotNull(nmClient);
+ // asserting we are using the singleton instance cache
+ Assert.assertSame(
+ NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
+ nmClient.init(conf);
+ nmClient.start();
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // START OPPORTUNISTIC Container, Send allocation request to RM
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null, ExecutionTypeRequest
+ .newInstance(ExecutionType.GUARANTEED, true)));
+
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(1, oppContainersRequestedAny);
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map<ContainerId, Container> allocatedGuaranteedContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+
+ AllocateResponse allocResponse = null;
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ allocResponse = amClient.allocate(0.1f);
+ // let NM heartbeat to RM and trigger allocations
+ //triggerSchedulingWithNMHeartBeat();
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ allocatedGuaranteedContainers.put(container.getId(), container);
+ }
+ }
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny,
+ allocatedGuaranteedContainers.size());
+ startContainer(allocResponse, nmClient);
+
+ // SEND DEMOTION REQUEST TO RM
+ try {
+ Container c = allocatedGuaranteedContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ assertTrue(e.getMessage().contains(
+ "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+ }
+
+ Container c = allocatedGuaranteedContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ iterationsLeft = 120;
+ Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(1, updatedContainers.size());
+
+ for (ContainerId cId : allocatedGuaranteedContainers.keySet()) {
+ Container orig = allocatedGuaranteedContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ updateContainerExecType(allocResponse, ExecutionType.OPPORTUNISTIC,
+ nmClient);
+ amClient.ask.clear();
+ }
+
+ private void updateContainerExecType(AllocateResponse allocResponse,
+ ExecutionType expectedExecType, NMClientImpl nmClient)
+ throws IOException, YarnException {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ Container container = updatedContainer.getContainer();
+ nmClient.increaseContainerResource(container);
+ // NodeManager may still need some time to get the stable
+ // container status
+ while (true) {
+ ContainerStatus status = nmClient
+ .getContainerStatus(container.getId(), container.getNodeId());
+ if (status.getExecutionType() == expectedExecType) {
+ break;
+ }
+ sleep(10);
+ }
+ }
+ }
+
+ private void startContainer(AllocateResponse allocResponse,
+ NMClientImpl nmClient) throws IOException, YarnException {
+ // START THE CONTAINER IN NM
+ // build container launch context
+ Credentials ts = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ // start a process long enough for increase/decrease action to take effect
+ ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
+ Collections.<String, LocalResource>emptyMap(),
+ new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+ new HashMap<String, ByteBuffer>(), securityTokens,
+ new HashMap<ApplicationAccessType, String>());
+ // start the containers and make sure they are in RUNNING state
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ nmClient.startContainer(container, clc);
+ // NodeManager may still need some time to get the stable
+ // container status
+ while (true) {
+ ContainerStatus status = nmClient
+ .getContainerStatus(container.getId(), container.getNodeId());
+ if (status.getState() == ContainerState.RUNNING) {
+ break;
+ }
+ sleep(10);
+ }
+ }
}
+
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
throws YarnException, IOException {
// setup container request
@@ -1172,7 +1509,7 @@ public class TestAMRMClient {
Set<ContainerId> releases = new TreeSet<ContainerId>();
amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
+ assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
while (allocatedContainerCount < containersRequestedAny
@@ -1192,7 +1529,7 @@ public class TestAMRMClient {
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
if (receivedNMTokens.containsKey(nodeID)) {
- Assert.fail("Received token again for : " + nodeID);
+ fail("Received token again for : " + nodeID);
}
receivedNMTokens.put(nodeID, token.getToken());
}
@@ -1204,7 +1541,7 @@ public class TestAMRMClient {
}
// Should receive atleast 1 token
- Assert.assertTrue(receivedNMTokens.size() > 0
+ assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount);
assertEquals(allocatedContainerCount, containersRequestedAny);
@@ -1444,7 +1781,7 @@ public class TestAMRMClient {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
getAMRMToken();
Assert.assertNotNull(amrmToken_1);
- Assert.assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
+ assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
// Wait for enough time and make sure the roll_over happens
@@ -1459,7 +1796,7 @@ public class TestAMRMClient {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
getAMRMToken();
Assert.assertNotNull(amrmToken_2);
- Assert.assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
+ assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
@@ -1474,7 +1811,7 @@ public class TestAMRMClient {
AMRMTokenIdentifierForTest newVersionTokenIdentifier =
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
- Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier",
+ assertEquals("Message is changed after set to newVersionTokenIdentifier",
"message", newVersionTokenIdentifier.getMessage());
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
@@ -1530,10 +1867,10 @@ public class TestAMRMClient {
.getBindAddress(), conf);
}
}).allocate(Records.newRecord(AllocateRequest.class));
- Assert.fail("The old Token should not work");
+ fail("The old Token should not work");
} catch (Exception ex) {
- Assert.assertTrue(ex instanceof InvalidToken);
- Assert.assertTrue(ex.getMessage().contains(
+ assertTrue(ex instanceof InvalidToken);
+ assertTrue(ex.getMessage().contains(
"Invalid AMRMToken from "
+ amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
}
@@ -1560,7 +1897,7 @@ public class TestAMRMClient {
org.apache.hadoop.security.token.Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
if (result != null) {
- Assert.fail("credentials has more than one AMRM token."
+ fail("credentials has more than one AMRM token."
+ " token1: " + result + " token2: " + token);
}
result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 6bd0816..9b79e2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -301,7 +301,6 @@ public class TestNMClient {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
-
// increaseContainerResource shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
@@ -475,10 +474,10 @@ public class TestNMClient {
try {
nmClient.increaseContainerResource(container);
} catch (YarnException e) {
- // NM container will only be in SCHEDULED state, so expect the increase
- // action to fail.
+ // NM container increase container resource should fail without a version
+ // increase action to fail.
if (!e.getMessage().contains(
- "can only be changed when a container is in RUNNING state")) {
+ container.getId() + " has update version ")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 84ed3c1..a1e8ca0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -136,13 +137,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -410,8 +412,24 @@ public class ContainerManagerImpl extends CompositeService implements
throws IOException {
StartContainerRequest req = rcs.getStartRequest();
ContainerLaunchContext launchContext = req.getContainerLaunchContext();
- ContainerTokenIdentifier token =
- BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+ ContainerTokenIdentifier token = null;
+ if(rcs.getCapability() != null) {
+ ContainerTokenIdentifier originalToken =
+ BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+ token = new ContainerTokenIdentifier(originalToken.getContainerID(),
+ originalToken.getVersion(), originalToken.getNmHostAddress(),
+ originalToken.getApplicationSubmitter(), rcs.getCapability(),
+ originalToken.getExpiryTimeStamp(), originalToken.getMasterKeyId(),
+ originalToken.getRMIdentifier(), originalToken.getPriority(),
+ originalToken.getCreationTime(),
+ originalToken.getLogAggregationContext(),
+ originalToken.getNodeLabelExpression(),
+ originalToken.getContainerType(), originalToken.getExecutionType());
+
+ } else {
+ token = BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+ }
+
ContainerId containerId = token.getContainerID();
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
@@ -1183,9 +1201,7 @@ public class ContainerManagerImpl extends CompositeService implements
// as container resource increase request will have come with
// an updated NMToken.
updateNMTokenIdentifier(nmTokenIdentifier);
- Resource resource = containerTokenIdentifier.getResource();
- changeContainerResourceInternal(containerId,
- containerTokenIdentifier.getVersion(), resource, true);
+ updateContainerInternal(containerId, containerTokenIdentifier);
successfullyUpdatedContainers.add(containerId);
} catch (YarnException | InvalidToken e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -1199,9 +1215,9 @@ public class ContainerManagerImpl extends CompositeService implements
}
@SuppressWarnings("unchecked")
- private void changeContainerResourceInternal(ContainerId containerId,
- int containerVersion, Resource targetResource, boolean increase)
- throws YarnException, IOException {
+ private void updateContainerInternal(ContainerId containerId,
+ ContainerTokenIdentifier containerTokenIdentifier)
+ throws YarnException, IOException {
Container container = context.getContainers().get(containerId);
// Check container existence
if (container == null) {
@@ -1213,64 +1229,77 @@ public class ContainerManagerImpl extends CompositeService implements
+ " is not handled by this NodeManager");
}
}
+ // Check container version.
+ int currentVersion = container.getContainerTokenIdentifier().getVersion();
+ if (containerTokenIdentifier.getVersion() <= currentVersion) {
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " has update version [" + currentVersion + "] >= requested version"
+ + " [" + containerTokenIdentifier.getVersion() + "]");
+ }
+
// Check container state
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState currentState =
container.getContainerState();
if (currentState != org.apache.hadoop.yarn.server.
- nodemanager.containermanager.container.ContainerState.RUNNING) {
+ nodemanager.containermanager.container.ContainerState.RUNNING &&
+ currentState != org.apache.hadoop.yarn.server.
+ nodemanager.containermanager.container.ContainerState.SCHEDULED) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is in " + currentState.name() + " state."
+ " Resource can only be changed when a container is in"
- + " RUNNING state");
+ + " RUNNING or SCHEDULED state");
}
+
// Check validity of the target resource.
Resource currentResource = container.getResource();
- if (currentResource.equals(targetResource)) {
- LOG.warn("Unable to change resource for container "
- + containerId.toString()
- + ". The target resource "
- + targetResource.toString()
- + " is the same as the current resource");
- return;
- }
- if (increase && !Resources.fitsIn(currentResource, targetResource)) {
- throw RPCUtil.getRemoteException("Unable to increase resource for "
- + "container " + containerId.toString()
- + ". The target resource "
- + targetResource.toString()
- + " is smaller than the current resource "
- + currentResource.toString());
- }
- if (!increase &&
- (!Resources.fitsIn(Resources.none(), targetResource)
- || !Resources.fitsIn(targetResource, currentResource))) {
- throw RPCUtil.getRemoteException("Unable to decrease resource for "
- + "container " + containerId.toString()
- + ". The target resource "
- + targetResource.toString()
- + " is not smaller than the current resource "
- + currentResource.toString());
- }
- if (increase) {
- org.apache.hadoop.yarn.api.records.Container increasedContainer =
- org.apache.hadoop.yarn.api.records.Container.newInstance(
- containerId, null, null, targetResource, null, null);
+ ExecutionType currentExecType =
+ container.getContainerTokenIdentifier().getExecutionType();
+ boolean isResourceChange = false;
+ boolean isExecTypeUpdate = false;
+ Resource targetResource = containerTokenIdentifier.getResource();
+ ExecutionType targetExecType = containerTokenIdentifier.getExecutionType();
+
+ // Is true if either the resources has increased or execution type
+ // updated from opportunistic to guaranteed
+ boolean isIncrease = false;
+ if (!currentResource.equals(targetResource)) {
+ isResourceChange = true;
+ isIncrease = Resources.fitsIn(currentResource, targetResource)
+ && !Resources.fitsIn(targetResource, currentResource);
+ } else if (!currentExecType.equals(targetExecType)) {
+ isExecTypeUpdate = true;
+ isIncrease = currentExecType == ExecutionType.OPPORTUNISTIC &&
+ targetExecType == ExecutionType.GUARANTEED;
+ }
+ if (isIncrease) {
+ org.apache.hadoop.yarn.api.records.Container increasedContainer = null;
+ if (isResourceChange) {
+ increasedContainer =
+ org.apache.hadoop.yarn.api.records.Container.newInstance(
+ containerId, null, null, targetResource, null, null,
+ currentExecType);
+ } else {
+ increasedContainer =
+ org.apache.hadoop.yarn.api.records.Container.newInstance(
+ containerId, null, null, currentResource, null, null,
+ targetExecType);
+ }
if (context.getIncreasedContainers().putIfAbsent(containerId,
increasedContainer) != null){
throw RPCUtil.getRemoteException("Container " + containerId.toString()
- + " resource is being increased.");
+ + " resource is being increased -or- " +
+ "is undergoing ExecutionType promoted.");
}
}
this.readLock.lock();
try {
if (!serviceStopped) {
- // Persist container resource change for recovery
- this.context.getNMStateStore().storeContainerResourceChanged(
- containerId, containerVersion, targetResource);
- getContainersMonitor().handle(
- new ChangeMonitoringContainerResourceEvent(
- containerId, targetResource));
+ // Dispatch message to ContainerScheduler to actually
+ // make the change.
+ dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
+ container, containerTokenIdentifier, isResourceChange,
+ isExecTypeUpdate, isIncrease));
} else {
throw new YarnException(
"Unable to change container resource as the NodeManager is "
@@ -1571,8 +1600,11 @@ public class ContainerManagerImpl extends CompositeService implements
for (org.apache.hadoop.yarn.api.records.Container container
: containersDecreasedEvent.getContainersToDecrease()) {
try {
- changeContainerResourceInternal(container.getId(),
- container.getVersion(), container.getResource(), false);
+ ContainerTokenIdentifier containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(
+ container.getContainerToken());
+ updateContainerInternal(container.getId(),
+ containerTokenIdentifier);
} catch (YarnException e) {
LOG.error("Unable to decrease container resource", e);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index bd3f06d..f6e567c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -39,10 +39,10 @@ public interface Container extends EventHandler<ContainerEvent> {
Resource getResource();
- void setResource(Resource targetResource);
-
ContainerTokenIdentifier getContainerTokenIdentifier();
+ void setContainerTokenIdentifier(ContainerTokenIdentifier token);
+
String getUser();
ContainerState getContainerState();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index c0aa6b0..734a27b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -148,9 +148,8 @@ public class ContainerImpl implements Container {
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private volatile ContainerLaunchContext launchContext;
- private final ContainerTokenIdentifier containerTokenIdentifier;
+ private volatile ContainerTokenIdentifier containerTokenIdentifier;
private final ContainerId containerId;
- private volatile Resource resource;
private final String user;
private int version;
private int exitCode = ContainerExitStatus.INVALID;
@@ -201,7 +200,6 @@ public class ContainerImpl implements Container {
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
- this.resource = containerTokenIdentifier.getResource();
this.diagnostics = new StringBuilder();
this.credentials = creds;
this.metrics = metrics;
@@ -269,13 +267,6 @@ public class ContainerImpl implements Container {
this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(rcs.getDiagnostics());
- Resource recoveredCapability = rcs.getCapability();
- if (recoveredCapability != null
- && !this.resource.equals(recoveredCapability)) {
- // resource capability had been updated before NM was down
- this.resource = Resource.newInstance(recoveredCapability.getMemorySize(),
- recoveredCapability.getVirtualCores());
- }
this.version = rcs.getVersion();
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
@@ -640,14 +631,8 @@ public class ContainerImpl implements Container {
@Override
public Resource getResource() {
- return Resources.clone(this.resource);
- }
-
- @Override
- public void setResource(Resource targetResource) {
- Resource currentResource = getResource();
- this.resource = Resources.clone(targetResource);
- this.metrics.changeContainer(currentResource, targetResource);
+ return Resources.clone(
+ this.containerTokenIdentifier.getResource());
}
@Override
@@ -661,6 +646,16 @@ public class ContainerImpl implements Container {
}
@Override
+ public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
+ this.writeLock.lock();
+ try {
+ this.containerTokenIdentifier = token;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
public String getWorkDir() {
return workDir;
}
@@ -833,7 +828,8 @@ public class ContainerImpl implements Container {
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
- container.metrics.releaseContainer(container.resource);
+ container.metrics.releaseContainer(
+ container.containerTokenIdentifier.getResource());
container.sendFinishedEvents();
return ContainerState.DONE;
}
@@ -1517,7 +1513,8 @@ public class ContainerImpl implements Container {
@Override
@SuppressWarnings("unchecked")
public void transition(ContainerImpl container, ContainerEvent event) {
- container.metrics.releaseContainer(container.resource);
+ container.metrics.releaseContainer(
+ container.containerTokenIdentifier.getResource());
if (container.containerMetrics != null) {
container.containerMetrics
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 6ee60bd..13e7491 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -741,19 +741,6 @@ public class ContainersMonitorImpl extends AbstractService implements
}
}
- private void changeContainerResource(
- ContainerId containerId, Resource resource) {
- Container container = context.getContainers().get(containerId);
- // Check container existence
- if (container == null) {
- LOG.warn("Container " + containerId.toString() + "does not exist");
- return;
- }
- // YARN-5860: Route this through the ContainerScheduler to
- // fix containerAllocation
- container.setResource(resource);
- }
-
private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
if (!containerMetricsEnabled || monitoringEvent == null) {
return;
@@ -902,8 +889,6 @@ public class ContainersMonitorImpl extends AbstractService implements
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
}
-
- changeContainerResource(containerId, changeEvent.getResource());
}
private void onStopMonitoringContainer(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 60d6213..19b4505 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+ .ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -136,6 +139,13 @@ public class ContainerScheduler extends AbstractService implements
case CONTAINER_COMPLETED:
onContainerCompleted(event.getContainer());
break;
+ case UPDATE_CONTAINER:
+ if (event instanceof UpdateContainerSchedulerEvent) {
+ onUpdateContainer((UpdateContainerSchedulerEvent) event);
+ } else {
+ LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
+ }
+ break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
@@ -146,6 +156,69 @@ public class ContainerScheduler extends AbstractService implements
}
/**
+ * We assume that the ContainerManager has already figured out what kind
+ * of update this is.
+ */
+ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
+ ContainerId containerId = updateEvent.getContainer().getContainerId();
+ if (updateEvent.isResourceChange()) {
+ if (runningContainers.containsKey(containerId)) {
+ this.utilizationTracker.subtractContainerResource(
+ updateEvent.getContainer());
+ updateEvent.getContainer().setContainerTokenIdentifier(
+ updateEvent.getUpdatedToken());
+ this.utilizationTracker.addContainerResources(
+ updateEvent.getContainer());
+ getContainersMonitor().handle(
+ new ChangeMonitoringContainerResourceEvent(containerId,
+ updateEvent.getUpdatedToken().getResource()));
+ } else {
+ updateEvent.getContainer().setContainerTokenIdentifier(
+ updateEvent.getUpdatedToken());
+ }
+ try {
+ // Persist change in the state store.
+ this.context.getNMStateStore().storeContainerResourceChanged(
+ containerId,
+ updateEvent.getUpdatedToken().getVersion(),
+ updateEvent.getUpdatedToken().getResource());
+ } catch (IOException e) {
+ LOG.warn("Could not store container [" + containerId + "] resource " +
+ "change..", e);
+ }
+ }
+
+ if (updateEvent.isExecTypeUpdate()) {
+ updateEvent.getContainer().setContainerTokenIdentifier(
+ updateEvent.getUpdatedToken());
+ // If this is a running container.. just change the execution type
+ // and be done with it.
+ if (!runningContainers.containsKey(containerId)) {
+ // Promotion or not (Increase signifies either a promotion
+ // or container size increase)
+ if (updateEvent.isIncrease()) {
+ // Promotion of queued container..
+ if (queuedOpportunisticContainers.remove(containerId) != null) {
+ queuedGuaranteedContainers.put(containerId,
+ updateEvent.getContainer());
+ }
+ //Kill opportunistic containers if any to make room for
+ // promotion request
+ killOpportunisticContainers(updateEvent.getContainer());
+ } else {
+ // Demotion of queued container.. Should not happen too often
+ // since you should not find too many queued guaranteed
+ // containers
+ if (queuedGuaranteedContainers.remove(containerId) != null) {
+ queuedOpportunisticContainers.put(containerId,
+ updateEvent.getContainer());
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Return number of queued containers.
* @return Number of queued containers.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 086cb9b..917eda0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
public enum ContainerSchedulerEventType {
SCHEDULE_CONTAINER,
CONTAINER_COMPLETED,
+ UPDATE_CONTAINER,
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
new file mode 100644
index 0000000..5384b7e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+ .Container;
+/**
+ * Update Event consumed by the {@link ContainerScheduler}.
+ */
+public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
+
+ private ContainerTokenIdentifier updatedToken;
+ private boolean isResourceChange;
+ private boolean isExecTypeUpdate;
+ private boolean isIncrease;
+
+ /**
+ * Create instance of Event.
+ *
+ * @param originalContainer Original Container.
+ * @param updatedToken Updated Container Token.
+ * @param isResourceChange is this a Resource Change.
+ * @param isExecTypeUpdate is this an ExecTypeUpdate.
+ * @param isIncrease is this a Container Increase.
+ */
+ public UpdateContainerSchedulerEvent(Container originalContainer,
+ ContainerTokenIdentifier updatedToken, boolean isResourceChange,
+ boolean isExecTypeUpdate, boolean isIncrease) {
+ super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
+ this.updatedToken = updatedToken;
+ this.isResourceChange = isResourceChange;
+ this.isExecTypeUpdate = isExecTypeUpdate;
+ this.isIncrease = isIncrease;
+ }
+
+ /**
+ * Update Container Token.
+ *
+ * @return Container Token.
+ */
+ public ContainerTokenIdentifier getUpdatedToken() {
+ return updatedToken;
+ }
+
+ /**
+ * isResourceChange.
+ * @return isResourceChange.
+ */
+ public boolean isResourceChange() {
+ return isResourceChange;
+ }
+
+ /**
+ * isExecTypeUpdate.
+ * @return isExecTypeUpdate.
+ */
+ public boolean isExecTypeUpdate() {
+ return isExecTypeUpdate;
+ }
+
+ /**
+ * isIncrease.
+ * @return isIncrease.
+ */
+ public boolean isIncrease() {
+ return isIncrease;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index 0c025ac..b8cd7dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -682,7 +682,7 @@ public class TestNodeManagerResync {
try{
try {
updateBarrier.await();
- increaseTokens.add(getContainerToken(targetResource));
+ increaseTokens.add(getContainerToken(targetResource, 1));
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
@@ -710,6 +710,15 @@ public class TestNodeManagerResync {
getNMContext().getNodeId(), user, resource,
getNMContext().getContainerTokenSecretManager(), null);
}
+
+ private Token getContainerToken(Resource resource, int version)
+ throws IOException {
+ ContainerId cId = TestContainerManager.createContainerId(0);
+ return TestContainerManager.createContainerToken(
+ cId, version, DUMMY_RM_IDENTIFIER,
+ getNMContext().getNodeId(), user, resource,
+ getNMContext().getContainerTokenSecretManager(), null);
+ }
}
public static NMContainerStatus createNMContainerStatus(int id,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index d266ac1..6c96a47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -421,6 +421,20 @@ public abstract class BaseContainerManagerTest {
containerTokenIdentifier);
}
+ public static Token createContainerToken(ContainerId cId, int version,
+ long rmIdentifier, NodeId nodeId, String user, Resource resource,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogAggregationContext logAggregationContext) throws IOException {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, version, nodeId.toString(), user,
+ resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+ Priority.newInstance(0), 0, logAggregationContext, null,
+ ContainerType.TASK, ExecutionType.GUARANTEED);
+ return BuilderUtils.newContainerToken(nodeId,
+ containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ }
+
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
@@ -431,8 +445,23 @@ public abstract class BaseContainerManagerTest {
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
ContainerType.TASK, executionType);
- return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
- .retrievePassword(containerTokenIdentifier),
+ return BuilderUtils.newContainerToken(nodeId,
+ containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ }
+
+ public static Token createContainerToken(ContainerId cId, int version,
+ long rmIdentifier, NodeId nodeId, String user, Resource resource,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogAggregationContext logAggregationContext, ExecutionType executionType)
+ throws IOException {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, version, nodeId.toString(), user,
+ resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+ Priority.newInstance(0), 0, logAggregationContext, null,
+ ContainerType.TASK, executionType);
+ return BuilderUtils.newContainerToken(nodeId,
+ containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 24d46b6..9844225 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -80,14 +82,15 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
-import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -100,6 +103,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -116,10 +120,34 @@ public class TestContainerManager extends BaseContainerManagerTest {
static {
LOG = LogFactory.getLog(TestContainerManager.class);
}
-
+
+ private boolean delayContainers = false;
+
+ @Override
+ protected ContainerExecutor createContainerExecutor() {
+ DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+ @Override
+ public int launchContainer(ContainerStartContext ctx)
+ throws IOException, ConfigurationException {
+ if (delayContainers) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // Nothing..
+ }
+ }
+ return super.launchContainer(ctx);
+ }
+ };
+ exec.setConf(conf);
+ return spy(exec);
+ }
+
@Override
@Before
public void setup() throws IOException {
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
super.setup();
}
@@ -1468,7 +1496,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.assertEquals(strExceptionMsg,
ContainerManagerImpl.INVALID_NMTOKEN_MSG);
- ContainerManagerImpl spyContainerMgr = Mockito.spy(cMgrImpl);
+ ContainerManagerImpl spyContainerMgr = spy(cMgrImpl);
UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a");
Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
Mockito.when(spyContainerMgr.
@@ -1543,7 +1571,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
// container will have exited, and won't be in RUNNING state
ContainerId cId0 = createContainerId(0);
Token containerToken =
- createContainerToken(cId0, DUMMY_RM_IDENTIFIER,
+ createContainerToken(cId0, 1, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user,
Resource.newInstance(1234, 3),
context.getContainerTokenSecretManager(), null);
@@ -1572,7 +1600,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
if (cId0.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Resource can only be changed when a "
- + "container is in RUNNING state"));
+ + "container is in RUNNING or SCHEDULED state"));
} else if (cId7.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Container " + cId7.toString()
@@ -1585,89 +1613,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
}
@Test
- public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
- containerManager.start();
- File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
- PrintWriter fileWriter = new PrintWriter(scriptFile);
- // Construct the Container-id
- ContainerId cId = createContainerId(0);
- if (Shell.WINDOWS) {
- fileWriter.println("@ping -n 100 127.0.0.1 >nul");
- } else {
- fileWriter.write("\numask 0");
- fileWriter.write("\nexec sleep 100");
- }
- fileWriter.close();
- ContainerLaunchContext containerLaunchContext =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
- URL resource_alpha =
- URL.fromPath(localFS
- .makeQualified(new Path(scriptFile.getAbsolutePath())));
- LocalResource rsrc_alpha =
- recordFactory.newRecordInstance(LocalResource.class);
- rsrc_alpha.setResource(resource_alpha);
- rsrc_alpha.setSize(-1);
- rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
- rsrc_alpha.setType(LocalResourceType.FILE);
- rsrc_alpha.setTimestamp(scriptFile.lastModified());
- String destinationFile = "dest_file";
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
- localResources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.setLocalResources(localResources);
- List<String> commands =
- Arrays.asList(Shell.getRunScriptCommand(scriptFile));
- containerLaunchContext.setCommands(commands);
-
- StartContainerRequest scRequest =
- StartContainerRequest.newInstance(
- containerLaunchContext,
- createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
- user, context.getContainerTokenSecretManager()));
- List<StartContainerRequest> list = new ArrayList<>();
- list.add(scRequest);
- StartContainersRequest allRequests =
- StartContainersRequest.newInstance(list);
- containerManager.startContainers(allRequests);
- // Make sure the container reaches RUNNING state
- BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
- org.apache.hadoop.yarn.server.nodemanager.
- containermanager.container.ContainerState.RUNNING);
- // Construct container resource increase request,
- List<Token> increaseTokens = new ArrayList<>();
- // Add increase request. The increase request should fail
- // as the current resource does not fit in the target resource
- Token containerToken =
- createContainerToken(cId, DUMMY_RM_IDENTIFIER,
- context.getNodeId(), user,
- Resource.newInstance(512, 1),
- context.getContainerTokenSecretManager(), null);
- increaseTokens.add(containerToken);
- ContainerUpdateRequest updateRequest =
- ContainerUpdateRequest.newInstance(increaseTokens);
- ContainerUpdateResponse updateResponse =
- containerManager.updateContainer(updateRequest);
- // Check response
- Assert.assertEquals(
- 0, updateResponse.getSuccessfullyUpdatedContainers().size());
- Assert.assertEquals(1, updateResponse.getFailedRequests().size());
- for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
- .getFailedRequests().entrySet()) {
- if (cId.equals(entry.getKey())) {
- Assert.assertNotNull("Failed message", entry.getValue().getMessage());
- Assert.assertTrue(entry.getValue().getMessage()
- .contains("The target resource "
- + Resource.newInstance(512, 1).toString()
- + " is smaller than the current resource "
- + Resource.newInstance(1024, 1)));
- } else {
- throw new YarnException("Received failed request from wrong"
- + " container: " + entry.getKey().toString());
- }
- }
- }
-
- @Test
public void testChangeContainerResource() throws Exception {
containerManager.start();
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
@@ -1720,7 +1665,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
List<Token> increaseTokens = new ArrayList<>();
// Add increase request.
Resource targetResource = Resource.newInstance(4096, 2);
- Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ Token containerToken = createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, targetResource,
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
@@ -1741,15 +1686,19 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Check status immediately as resource increase is blocking
assertEquals(targetResource, containerStatus.getCapability());
// Simulate a decrease request
- List<org.apache.hadoop.yarn.api.records.Container> containersToDecrease
- = new ArrayList<>();
+ List<Token> decreaseTokens = new ArrayList<>();
targetResource = Resource.newInstance(2048, 2);
- org.apache.hadoop.yarn.api.records.Container decreasedContainer =
- org.apache.hadoop.yarn.api.records.Container
- .newInstance(cId, null, null, targetResource, null, null);
- containersToDecrease.add(decreasedContainer);
- containerManager.handle(
- new CMgrDecreaseContainersResourceEvent(containersToDecrease));
+ Token token = createContainerToken(cId, 2, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user, targetResource,
+ context.getContainerTokenSecretManager(), null);
+ decreaseTokens.add(token);
+ updateRequest = ContainerUpdateRequest.newInstance(decreaseTokens);
+ updateResponse = containerManager.updateContainer(updateRequest);
+
+ Assert.assertEquals(
+ 1, updateResponse.getSuccessfullyUpdatedContainers().size());
+ Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
+
// Check status with retry
containerStatus = containerManager
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
@@ -1879,7 +1828,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerLaunchContext spyContainerLaunchContext =
- Mockito.spy(containerLaunchContext);
+ spy(containerLaunchContext);
Mockito.when(spyContainerLaunchContext.getLocalResources())
.thenReturn(localResources);
@@ -1924,7 +1873,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerLaunchContext spyContainerLaunchContext =
- Mockito.spy(containerLaunchContext);
+ spy(containerLaunchContext);
Mockito.when(spyContainerLaunchContext.getLocalResources())
.thenReturn(localResources);
@@ -1969,7 +1918,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerLaunchContext spyContainerLaunchContext =
- Mockito.spy(containerLaunchContext);
+ spy(containerLaunchContext);
Mockito.when(spyContainerLaunchContext.getLocalResources())
.thenReturn(localResources);
@@ -1996,4 +1945,122 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("Null resource visibility for local resource"));
}
+
+ @Test
+ public void testContainerUpdateExecTypeOpportunisticToGuaranteed()
+ throws IOException, YarnException, InterruptedException {
+ delayContainers = true;
+ containerManager.start();
+ // Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user, BuilderUtils.newResource(512, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC));
+ List<StartContainerRequest> list = new ArrayList<>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+ // Make sure the container reaches RUNNING state
+ BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState.RUNNING);
+ // Construct container resource increase request,
+ List<Token> updateTokens = new ArrayList<>();
+ Token containerToken =
+ createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, BuilderUtils.newResource(512, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.GUARANTEED);
+ updateTokens.add(containerToken);
+ ContainerUpdateRequest updateRequest =
+ ContainerUpdateRequest.newInstance(updateTokens);
+ ContainerUpdateResponse updateResponse =
+ containerManager.updateContainer(updateRequest);
+
+ Assert.assertEquals(
+ 1, updateResponse.getSuccessfullyUpdatedContainers().size());
+ Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
+
+ //Make sure the container is running
+ List<ContainerId> statList = new ArrayList<ContainerId>();
+ statList.add(cId);
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ Assert.assertEquals(1, containerStatuses.size());
+ for (ContainerStatus status : containerStatuses) {
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+ status.getState());
+ Assert.assertEquals(ExecutionType.GUARANTEED, status.getExecutionType());
+ }
+ }
+
+ @Test
+ public void testContainerUpdateExecTypeGuaranteedToOpportunistic()
+ throws IOException, YarnException, InterruptedException {
+ delayContainers = true;
+ containerManager.start();
+ // Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user, BuilderUtils.newResource(512, 1),
+ context.getContainerTokenSecretManager(), null));
+ List<StartContainerRequest> list = new ArrayList<>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+ // Make sure the container reaches RUNNING state
+ BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState.RUNNING);
+ // Construct container resource increase request,
+ List<Token> updateTokens = new ArrayList<>();
+ Token containerToken =
+ createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, BuilderUtils.newResource(512, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC);
+ updateTokens.add(containerToken);
+ ContainerUpdateRequest updateRequest =
+ ContainerUpdateRequest.newInstance(updateTokens);
+ ContainerUpdateResponse updateResponse =
+ containerManager.updateContainer(updateRequest);
+
+ Assert.assertEquals(
+ 1, updateResponse.getSuccessfullyUpdatedContainers().size());
+ Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
+
+ //Make sure the container is running
+ List<ContainerId> statList = new ArrayList<ContainerId>();
+ statList.add(cId);
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ Assert.assertEquals(1, containerStatuses.size());
+ for (ContainerStatus status : containerStatuses) {
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+ status.getState());
+ Assert
+ .assertEquals(ExecutionType.OPPORTUNISTIC, status.getExecutionType());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d7be1d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index d2bd79c..224e99c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -652,7 +652,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
final List<Token> increaseTokens = new ArrayList<Token>();
// add increase request
Token containerToken = TestContainerManager.createContainerToken(
- cid, 0, context.getNodeId(), user.getShortUserName(),
+ cid, 1, 0, context.getNodeId(), user.getShortUserName(),
capability, context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
final ContainerUpdateRequest updateRequest =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org