You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/14 05:11:58 UTC
[19/22] hadoop git commit: YARN-1645. ContainerManager implementation
to support container resizing. Contributed by Meng Ding & Wangda Tan
YARN-1645. ContainerManager implementation to support container resizing. Contributed by Meng Ding & Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47071f89
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47071f89
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47071f89
Branch: refs/heads/YARN-1197
Commit: 47071f89babef34113acacccb815048ddb798e64
Parents: 42dd09e
Author: Jian He <ji...@apache.org>
Authored: Tue Jul 21 16:10:40 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Sun Sep 13 19:51:11 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../CMgrDecreaseContainersResourceEvent.java | 37 ++++
.../nodemanager/ContainerManagerEventType.java | 1 +
.../containermanager/ContainerManagerImpl.java | 180 ++++++++++++++++--
.../container/ChangeContainerResourceEvent.java | 36 ++++
.../container/ContainerEventType.java | 4 +
.../nodemanager/DummyContainerManager.java | 6 +-
.../TestContainerManagerWithLCE.java | 22 +++
.../BaseContainerManagerTest.java | 43 ++++-
.../containermanager/TestContainerManager.java | 190 ++++++++++++++++++-
10 files changed, 486 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e168774..8ac1d78 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -196,6 +196,9 @@ Release 2.8.0 - UNRELEASED
YARN-1449. AM-NM protocol changes to support container resizing.
(Meng Ding & Wangda Tan via jianhe)
+ YARN-1645. ContainerManager implementation to support container resizing.
+ (Meng Ding & Wangda Tan via jianhe)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
new file mode 100644
index 0000000..9479d0b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import java.util.List;
+
+public class CMgrDecreaseContainersResourceEvent extends ContainerManagerEvent {
+
+ private final List<Container> containersToDecrease;
+
+ public CMgrDecreaseContainersResourceEvent(List<Container>
+ containersToDecrease) {
+ super(ContainerManagerEventType.DECREASE_CONTAINERS_RESOURCE);
+ this.containersToDecrease = containersToDecrease;
+ }
+
+ public List<Container> getContainersToDecrease() {
+ return this.containersToDecrease;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
index 4278ce0..fcb0252 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
@@ -21,4 +21,5 @@ package org.apache.hadoop.yarn.server.nodemanager;
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS,
+ DECREASE_CONTAINERS_RESOURCE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index ba1aec2..890a4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
@@ -95,6 +96,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -113,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
@@ -141,6 +144,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
@@ -681,33 +685,45 @@ public class ContainerManagerImpl extends CompositeService implements
/**
* @param containerTokenIdentifier
- * of the container to be started
+ * of the container whose resource is to be started or increased
* @throws YarnException
*/
@Private
@VisibleForTesting
- protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
+ protected void authorizeStartAndResourceIncreaseRequest(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ boolean startRequest)
+ throws YarnException {
if (nmTokenIdentifier == null) {
throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
}
if (containerTokenIdentifier == null) {
throw RPCUtil.getRemoteException(INVALID_CONTAINERTOKEN_MSG);
}
+ /*
+ * Check the following:
+ * 1. The request comes from the same application attempt
+ * 2. The request possess a container token that has not expired
+ * 3. The request possess a container token that is granted by a known RM
+ */
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIDStr = containerId.toString();
boolean unauthorized = false;
StringBuilder messageBuilder =
- new StringBuilder("Unauthorized request to start container. ");
+ new StringBuilder("Unauthorized request to " + (startRequest ?
+ "start container." : "increase container resource."));
if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
equals(containerId.getApplicationAttemptId().getApplicationId())) {
unauthorized = true;
messageBuilder.append("\nNMToken for application attempt : ")
.append(nmTokenIdentifier.getApplicationAttemptId())
- .append(" was used for starting container with container token")
+ .append(" was used for "
+ + (startRequest ? "starting " : "increasing resource of ")
+ + "container with container token")
.append(" issued for application attempt : ")
.append(containerId.getApplicationAttemptId());
- } else if (!this.context.getContainerTokenSecretManager()
+ } else if (startRequest && !this.context.getContainerTokenSecretManager()
.isValidStartContainerRequest(containerTokenIdentifier)) {
// Is the container being relaunched? Or RPC layer let startCall with
// tokens generated off old-secret through?
@@ -729,6 +745,14 @@ public class ContainerManagerImpl extends CompositeService implements
LOG.error(msg);
throw RPCUtil.getRemoteException(msg);
}
+ if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
+ .getRMIdentifier()) {
+ // Is the container coming from unknown RM
+ StringBuilder sb = new StringBuilder("\nContainer ");
+ sb.append(containerTokenIdentifier.getContainerID().toString())
+ .append(" rejected as it is allocated by a previous RM");
+ throw new InvalidContainerException(sb.toString());
+ }
}
/**
@@ -745,7 +769,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
- authorizeUser(remoteUgi,nmTokenIdentifier);
+ authorizeUser(remoteUgi, nmTokenIdentifier);
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
@@ -844,16 +868,8 @@ public class ContainerManagerImpl extends CompositeService implements
* belongs to correct Node Manager (part of retrieve password). c) It has
* correct RMIdentifier. d) It is not expired.
*/
- authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
-
- if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
- .getRMIdentifier()) {
- // Is the container coming from unknown RM
- StringBuilder sb = new StringBuilder("\nContainer ");
- sb.append(containerTokenIdentifier.getContainerID().toString())
- .append(" rejected as it is allocated by a previous RM");
- throw new InvalidContainerException(sb.toString());
- }
+ authorizeStartAndResourceIncreaseRequest(
+ nmTokenIdentifier, containerTokenIdentifier, true);
// update NMToken
updateNMTokenIdentifier(nmTokenIdentifier);
@@ -960,9 +976,118 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest requests)
- throws YarnException, IOException {
- // To be implemented in YARN-1645
- return null;
+ throws YarnException, IOException {
+ if (blockNewContainerRequests.get()) {
+ throw new NMNotYetReadyException(
+ "Rejecting container resource increase as NodeManager has not"
+ + " yet connected with ResourceManager");
+ }
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+ authorizeUser(remoteUgi, nmTokenIdentifier);
+ List<ContainerId> successfullyIncreasedContainers
+ = new ArrayList<ContainerId>();
+ Map<ContainerId, SerializedException> failedContainers =
+ new HashMap<ContainerId, SerializedException>();
+ // Process container resource increase requests
+ for (org.apache.hadoop.yarn.api.records.Token token :
+ requests.getContainersToIncrease()) {
+ ContainerId containerId = null;
+ try {
+ if (token.getIdentifier() == null) {
+ throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+ }
+ ContainerTokenIdentifier containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(token);
+ verifyAndGetContainerTokenIdentifier(token,
+ containerTokenIdentifier);
+ authorizeStartAndResourceIncreaseRequest(
+ nmTokenIdentifier, containerTokenIdentifier, false);
+ containerId = containerTokenIdentifier.getContainerID();
+ // Reuse the startContainer logic to update NMToken,
+ // as container resource increase request will have come with
+ // an updated NMToken.
+ updateNMTokenIdentifier(nmTokenIdentifier);
+ Resource resource = containerTokenIdentifier.getResource();
+ changeContainerResourceInternal(containerId, resource, true);
+ successfullyIncreasedContainers.add(containerId);
+ } catch (YarnException | InvalidToken e) {
+ failedContainers.put(containerId, SerializedException.newInstance(e));
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
+ }
+ return IncreaseContainersResourceResponse.newInstance(
+ successfullyIncreasedContainers, failedContainers);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void changeContainerResourceInternal(
+ ContainerId containerId, Resource targetResource, boolean increase)
+ throws YarnException, IOException {
+ Container container = context.getContainers().get(containerId);
+ // Check container existence
+ if (container == null) {
+ if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " was recently stopped on node manager.");
+ } else {
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " is not handled by this NodeManager");
+ }
+ }
+ // Check container state
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState currentState =
+ container.getContainerState();
+ if (currentState != org.apache.hadoop.yarn.server.
+ nodemanager.containermanager.container.ContainerState.RUNNING) {
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " is in " + currentState.name() + " state."
+ + " Resource can only be changed when a container is in"
+ + " RUNNING state");
+ }
+ // Check validity of the target resource.
+ Resource currentResource = container.getResource();
+ if (currentResource.equals(targetResource)) {
+ LOG.warn("Unable to change resource for container "
+ + containerId.toString()
+ + ". The target resource "
+ + targetResource.toString()
+ + " is the same as the current resource");
+ return;
+ }
+ if (increase && !Resources.fitsIn(currentResource, targetResource)) {
+ throw RPCUtil.getRemoteException("Unable to increase resource for "
+ + "container " + containerId.toString()
+ + ". The target resource "
+ + targetResource.toString()
+ + " is smaller than the current resource "
+ + currentResource.toString());
+ }
+ if (!increase &&
+ (!Resources.fitsIn(Resources.none(), targetResource)
+ || !Resources.fitsIn(targetResource, currentResource))) {
+ throw RPCUtil.getRemoteException("Unable to decrease resource for "
+ + "container " + containerId.toString()
+ + ". The target resource "
+ + targetResource.toString()
+ + " is not smaller than the current resource "
+ + currentResource.toString());
+ }
+ this.readLock.lock();
+ try {
+ if (!serviceStopped) {
+ dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent(
+ containerId, targetResource));
+ } else {
+ throw new YarnException(
+ "Unable to change container resource as the NodeManager is "
+ + "in the process of shutting down");
+ }
+ } finally {
+ this.readLock.unlock();
+ }
}
@Private
@@ -1182,6 +1307,21 @@ public class ContainerManagerImpl extends CompositeService implements
"Container Killed by ResourceManager"));
}
break;
+ case DECREASE_CONTAINERS_RESOURCE:
+ CMgrDecreaseContainersResourceEvent containersDecreasedEvent =
+ (CMgrDecreaseContainersResourceEvent) event;
+ for (org.apache.hadoop.yarn.api.records.Container container
+ : containersDecreasedEvent.getContainersToDecrease()) {
+ try {
+ changeContainerResourceInternal(container.getId(),
+ container.getResource(), false);
+ } catch (YarnException e) {
+ LOG.error("Unable to decrease container resource", e);
+ } catch (IOException e) {
+ LOG.error("Unable to update container resource in store", e);
+ }
+ }
+ break;
default:
throw new YarnRuntimeException(
"Got an unknown ContainerManagerEvent type: " + event.getType());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
new file mode 100644
index 0000000..3944a3d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class ChangeContainerResourceEvent extends ContainerEvent {
+
+ private Resource resource;
+
+ public ChangeContainerResourceEvent(ContainerId c, Resource resource) {
+ super(c, ContainerEventType.CHANGE_CONTAINER_RESOURCE);
+ this.resource = resource;
+ }
+
+ public Resource getResource() {
+ return this.resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 5622f8c..dc712bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -25,6 +25,10 @@ public enum ContainerEventType {
KILL_CONTAINER,
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
+ CHANGE_CONTAINER_RESOURCE,
+
+ // Producer: ContainerMonitor
+ CONTAINER_RESOURCE_CHANGED,
// DownloadManager
CONTAINER_INITED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 349340b..3ff04d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -191,8 +191,10 @@ public class DummyContainerManager extends ContainerManagerImpl {
}
@Override
- protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
+ protected void authorizeStartAndResourceIncreaseRequest(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ boolean startRequest) throws YarnException {
// do nothing
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index a47e7f7..9a05278 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -189,6 +189,28 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
super.testStartContainerFailureWithUnknownAuxService();
}
+ @Override
+ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testIncreaseContainerResourceWithInvalidRequests");
+ super.testIncreaseContainerResourceWithInvalidRequests();
+ }
+
+ @Override
+ public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testIncreaseContainerResourceWithInvalidResource");
+ super.testIncreaseContainerResourceWithInvalidResource();
+ }
+
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 2810662..3938342 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -209,12 +209,13 @@ public abstract class BaseContainerManagerTest {
// do nothing
}
@Override
- protected void authorizeStartRequest(
- NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
- // do nothing
- }
-
+ protected void authorizeStartAndResourceIncreaseRequest(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ boolean startRequest) throws YarnException {
+ // do nothing
+ }
+
@Override
protected void updateNMTokenIdentifier(
NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
@@ -310,4 +311,34 @@ public abstract class BaseContainerManagerTest {
app.getApplicationState().equals(finalState));
}
+ public static void waitForNMContainerState(ContainerManagerImpl
+ containerManager, ContainerId containerID,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager
+ .container.ContainerState finalState)
+ throws InterruptedException, YarnException, IOException {
+ waitForNMContainerState(containerManager, containerID, finalState, 20);
+ }
+
+ public static void waitForNMContainerState(ContainerManagerImpl
+ containerManager, ContainerId containerID,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager
+ .container.ContainerState finalState, int timeOutMax)
+ throws InterruptedException, YarnException, IOException {
+ Container container =
+ containerManager.getContext().getContainers().get(containerID);
+ org.apache.hadoop.yarn.server.nodemanager
+ .containermanager.container.ContainerState currentState =
+ container.getContainerState();
+ int timeoutSecs = 0;
+ while (!currentState.equals(finalState)
+ && timeoutSecs++ < timeOutMax) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for NM container to get into state " + finalState
+ + ". Current state is " + currentState);
+ currentState = container.getContainerState();
+ }
+ LOG.info("Container state is " + currentState);
+ Assert.assertEquals("ContainerState is not correct (timedout)",
+ finalState, currentState);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47071f89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index e508424..e2f12ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
@@ -87,6 +90,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.junit.Assert.assertEquals;
+
public class TestContainerManager extends BaseContainerManagerTest {
public TestContainerManager() throws UnsupportedFileSystemException {
@@ -803,7 +808,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
metrics, dirsHandler);
String strExceptionMsg = "";
try {
- cMgrImpl.authorizeStartRequest(null, new ContainerTokenIdentifier());
+ cMgrImpl.authorizeStartAndResourceIncreaseRequest(
+ null, new ContainerTokenIdentifier(), true);
} catch(YarnException ye) {
strExceptionMsg = ye.getMessage();
}
@@ -812,7 +818,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
strExceptionMsg = "";
try {
- cMgrImpl.authorizeStartRequest(new NMTokenIdentifier(), null);
+ cMgrImpl.authorizeStartAndResourceIncreaseRequest(
+ new NMTokenIdentifier(), null, true);
} catch(YarnException ye) {
strExceptionMsg = ye.getMessage();
}
@@ -878,6 +885,167 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerManagerImpl.INVALID_CONTAINERTOKEN_MSG);
}
+ @Test
+ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception {
+ containerManager.start();
+ // Start 4 containers 0..4 with default resource (1024, 1)
+ List<StartContainerRequest> list = new ArrayList<>();
+ ContainerLaunchContext containerLaunchContext = recordFactory
+ .newRecordInstance(ContainerLaunchContext.class);
+ for (int i = 0; i < 4; i++) {
+ ContainerId cId = createContainerId(i);
+ long identifier = DUMMY_RM_IDENTIFIER;
+ Token containerToken = createContainerToken(cId, identifier,
+ context.getNodeId(), user, context.getContainerTokenSecretManager());
+ StartContainerRequest request = StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken);
+ list.add(request);
+ }
+ StartContainersRequest requestList = StartContainersRequest
+ .newInstance(list);
+ StartContainersResponse response = containerManager
+ .startContainers(requestList);
+
+ Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size());
+ int i = 0;
+ for (ContainerId id : response.getSuccessfullyStartedContainers()) {
+ Assert.assertEquals(i, id.getContainerId());
+ i++;
+ }
+
+ Thread.sleep(2000);
+ // Construct container resource increase request,
+ List<Token> increaseTokens = new ArrayList<Token>();
+ // Add increase request for container-0, the request will fail as the
+ // container will have exited, and won't be in RUNNING state
+ ContainerId cId0 = createContainerId(0);
+ Token containerToken =
+ createContainerToken(cId0, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user,
+ Resource.newInstance(1234, 3),
+ context.getContainerTokenSecretManager(), null);
+ increaseTokens.add(containerToken);
+ // Add increase request for container-7, the request will fail as the
+ // container does not exist
+ ContainerId cId7 = createContainerId(7);
+ containerToken =
+ createContainerToken(cId7, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user,
+ Resource.newInstance(1234, 3),
+ context.getContainerTokenSecretManager(), null);
+ increaseTokens.add(containerToken);
+
+ IncreaseContainersResourceRequest increaseRequest =
+ IncreaseContainersResourceRequest
+ .newInstance(increaseTokens);
+ IncreaseContainersResourceResponse increaseResponse =
+ containerManager.increaseContainersResource(increaseRequest);
+ // Check response
+ Assert.assertEquals(
+ 0, increaseResponse.getSuccessfullyIncreasedContainers().size());
+ Assert.assertEquals(2, increaseResponse.getFailedRequests().size());
+ for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
+ .getFailedRequests().entrySet()) {
+ Assert.assertNotNull("Failed message", entry.getValue().getMessage());
+ if (cId0.equals(entry.getKey())) {
+ Assert.assertTrue(entry.getValue().getMessage()
+ .contains("Resource can only be changed when a "
+ + "container is in RUNNING state"));
+ } else if (cId7.equals(entry.getKey())) {
+ Assert.assertTrue(entry.getValue().getMessage()
+ .contains("Container " + cId7.toString()
+ + " is not handled by this NodeManager"));
+ } else {
+ throw new YarnException("Received failed request from wrong"
+ + " container: " + entry.getKey().toString());
+ }
+ }
+ }
+
+ @Test
+ public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
+ containerManager.start();
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ // Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ if (Shell.WINDOWS) {
+ fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ fileWriter.write("\numask 0");
+ fileWriter.write("\nexec sleep 100");
+ }
+ fileWriter.close();
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+ List<String> commands =
+ Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+ containerLaunchContext.setCommands(commands);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+ // Make sure the container reaches RUNNING state
+ BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState.RUNNING);
+ // Construct container resource increase request,
+ List<Token> increaseTokens = new ArrayList<Token>();
+ // Add increase request. The increase request should fail
+ // as the current resource does not fit in the target resource
+ Token containerToken =
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user,
+ Resource.newInstance(512, 1),
+ context.getContainerTokenSecretManager(), null);
+ increaseTokens.add(containerToken);
+ IncreaseContainersResourceRequest increaseRequest =
+ IncreaseContainersResourceRequest
+ .newInstance(increaseTokens);
+ IncreaseContainersResourceResponse increaseResponse =
+ containerManager.increaseContainersResource(increaseRequest);
+ // Check response
+ Assert.assertEquals(
+ 0, increaseResponse.getSuccessfullyIncreasedContainers().size());
+ Assert.assertEquals(1, increaseResponse.getFailedRequests().size());
+ for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
+ .getFailedRequests().entrySet()) {
+ if (cId.equals(entry.getKey())) {
+ Assert.assertNotNull("Failed message", entry.getValue().getMessage());
+ Assert.assertTrue(entry.getValue().getMessage()
+ .contains("The target resource "
+ + Resource.newInstance(512, 1).toString()
+ + " is smaller than the current resource "
+ + Resource.newInstance(1024, 1)));
+ } else {
+ throw new YarnException("Received failed request from wrong"
+ + " container: " + entry.getKey().toString());
+ }
+ }
+ }
+
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
@@ -892,15 +1060,21 @@ public class TestContainerManager extends BaseContainerManagerTest {
LogAggregationContext logAggregationContext)
throws IOException {
Resource r = BuilderUtils.newResource(1024, 1);
+ return createContainerToken(cId, rmIdentifier, nodeId, user, r,
+ containerTokenSecretManager, logAggregationContext);
+ }
+
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user, Resource resource,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogAggregationContext logAggregationContext)
+ throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
- new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
+ new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null);
- Token containerToken =
- BuilderUtils
- .newContainerToken(nodeId, containerTokenSecretManager
- .retrievePassword(containerTokenIdentifier),
+ return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
+ .retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
- return containerToken;
}
}