You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:53 UTC
[48/50] [abbrv] hadoop git commit: YARN-1643. Make ContainersMonitor
support changing monitoring size of an allocated container. Contributed by
Meng Ding and Wangda Tan
YARN-1643. Make ContainersMonitor support changing monitoring size of an allocated container. Contributed by Meng Ding and Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5d70f938
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5d70f938
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5d70f938
Branch: refs/heads/YARN-1197
Commit: 5d70f93831c472ba82d4699a66b33c40780facf6
Parents: 508da11
Author: Jian He <ji...@apache.org>
Authored: Wed Aug 5 15:19:33 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Sep 4 11:48:04 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../monitor/ContainersMonitorImpl.java | 207 ++++++++++------
.../TestContainerManagerWithLCE.java | 11 +
.../containermanager/TestContainerManager.java | 96 +++++++
.../monitor/MockResourceCalculatorPlugin.java | 69 ++++++
.../MockResourceCalculatorProcessTree.java | 57 +++++
.../TestContainersMonitorResourceChange.java | 248 +++++++++++++++++++
7 files changed, 615 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 49eb7b9..a524bbc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -202,6 +202,9 @@ Release 2.8.0 - UNRELEASED
YARN-3867. ContainerImpl changes to support container resizing. (Meng Ding
via jianhe)
+ YARN-1643. Make ContainersMonitor support changing monitoring size of an
+ allocated container. (Meng Ding and Wangda Tan)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index afb51ad..b3839d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -18,13 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -32,12 +30,14 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@@ -56,16 +56,16 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;
- final List<ContainerId> containersToBeRemoved;
- final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
- Map<ContainerId, ProcessTreeInfo> trackingContainers =
- new HashMap<ContainerId, ProcessTreeInfo>();
+ @VisibleForTesting
+ final Map<ContainerId, ProcessTreeInfo> trackingContainers =
+ new ConcurrentHashMap<>();
- final ContainerExecutor containerExecutor;
+ private final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
private final Context context;
private ResourceCalculatorPlugin resourceCalculatorPlugin;
private Configuration conf;
+ private static float vmemRatio;
private Class<? extends ResourceCalculatorProcessTree> processTreeClass;
private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
@@ -82,6 +82,8 @@ public class ContainersMonitorImpl extends AbstractService implements
private ResourceUtilization containersUtilization;
+ private volatile boolean stopped = false;
+
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
super("containers-monitor");
@@ -90,8 +92,6 @@ public class ContainersMonitorImpl extends AbstractService implements
this.eventDispatcher = dispatcher;
this.context = context;
- this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
- this.containersToBeRemoved = new ArrayList<ContainerId>();
this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
@@ -140,7 +140,7 @@ public class ContainersMonitorImpl extends AbstractService implements
this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
// ///////// Virtual memory configuration //////
- float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
@@ -218,6 +218,7 @@ public class ContainersMonitorImpl extends AbstractService implements
@Override
protected void serviceStop() throws Exception {
if (containersMonitorEnabled) {
+ stopped = true;
this.monitoringThread.interrupt();
try {
this.monitoringThread.join();
@@ -228,7 +229,8 @@ public class ContainersMonitorImpl extends AbstractService implements
super.serviceStop();
}
- private static class ProcessTreeInfo {
+ @VisibleForTesting
+ static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
private ResourceCalculatorProcessTree pTree;
@@ -267,26 +269,43 @@ public class ContainersMonitorImpl extends AbstractService implements
this.pTree = pTree;
}
- public long getVmemLimit() {
+ /**
+ * @return Virtual memory limit for the process tree in bytes
+ */
+ public synchronized long getVmemLimit() {
return this.vmemLimit;
}
/**
* @return Physical memory limit for the process tree in bytes
*/
- public long getPmemLimit() {
+ public synchronized long getPmemLimit() {
return this.pmemLimit;
}
/**
- * Return the number of cpu vcores assigned
- * @return
+ * @return Number of cpu vcores assigned
*/
- public int getCpuVcores() {
+ public synchronized int getCpuVcores() {
return this.cpuVcores;
}
- }
+ /**
+ * Set resource limit for enforcement
+ * @param pmemLimit
+ * Physical memory limit for the process tree in bytes
+ * @param vmemLimit
+ * Virtual memory limit for the process tree in bytes
+ * @param cpuVcores
+ * Number of cpu vcores assigned
+ */
+ public synchronized void setResourceLimit(
+ long pmemLimit, long vmemLimit, int cpuVcores) {
+ this.pmemLimit = pmemLimit;
+ this.vmemLimit = vmemLimit;
+ this.cpuVcores = cpuVcores;
+ }
+ }
/**
* Check whether a container's process tree's current memory usage is over
@@ -359,8 +378,7 @@ public class ContainersMonitorImpl extends AbstractService implements
@Override
public void run() {
- while (true) {
-
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
// Print the processTrees for debugging.
if (LOG.isDebugEnabled()) {
StringBuilder tmp = new StringBuilder("[ ");
@@ -372,31 +390,6 @@ public class ContainersMonitorImpl extends AbstractService implements
+ tmp.substring(0, tmp.length()) + "]");
}
- // Add new containers
- synchronized (containersToBeAdded) {
- for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
- .entrySet()) {
- ContainerId containerId = entry.getKey();
- ProcessTreeInfo processTreeInfo = entry.getValue();
- LOG.info("Starting resource-monitoring for " + containerId);
- trackingContainers.put(containerId, processTreeInfo);
- }
- containersToBeAdded.clear();
- }
-
- // Remove finished containers
- synchronized (containersToBeRemoved) {
- for (ContainerId containerId : containersToBeRemoved) {
- if (containerMetricsEnabled) {
- ContainerMetrics.forContainer(
- containerId, containerMetricsPeriodMs).finished();
- }
- trackingContainers.remove(containerId);
- LOG.info("Stopping resource-monitoring for " + containerId);
- }
- containersToBeRemoved.clear();
- }
-
// Temporary structure to calculate the total resource utilization of
// the containers
ResourceUtilization trackedContainersUtilization =
@@ -408,10 +401,8 @@ public class ContainersMonitorImpl extends AbstractService implements
long pmemByAllContainers = 0;
long cpuUsagePercentPerCoreByAllContainers = 0;
long cpuUsageTotalCoresByAllContainers = 0;
- for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
- trackingContainers.entrySet().iterator(); it.hasNext();) {
-
- Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
+ for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
+ .entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
try {
@@ -435,11 +426,6 @@ public class ContainersMonitorImpl extends AbstractService implements
if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);
- int cpuVcores = ptInfo.getCpuVcores();
- final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
- final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
- usageMetrics.recordResourceLimit(
- vmemLimit, pmemLimit, cpuVcores);
usageMetrics.recordProcessId(pId);
}
}
@@ -548,7 +534,7 @@ public class ContainersMonitorImpl extends AbstractService implements
eventDispatcher.getEventHandler().handle(
new ContainerKillEvent(containerId,
containerExitStatus, msg));
- it.remove();
+ trackingContainers.remove(containerId);
LOG.info("Removed ProcessTree with root " + pId);
}
} catch (Exception e) {
@@ -605,6 +591,60 @@ public class ContainersMonitorImpl extends AbstractService implements
}
}
+ private void changeContainerResource(
+ ContainerId containerId, Resource resource) {
+ Container container = context.getContainers().get(containerId);
+ // Check container existence
+ if (container == null) {
+ LOG.warn("Container " + containerId.toString() + "does not exist");
+ return;
+ }
+ container.setResource(resource);
+ }
+
+ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
+ if (!containerMetricsEnabled || monitoringEvent == null) {
+ return;
+ }
+
+ ContainerId containerId = monitoringEvent.getContainerId();
+ ContainerMetrics usageMetrics = ContainerMetrics
+ .forContainer(containerId, containerMetricsPeriodMs);
+
+ int vmemLimitMBs;
+ int pmemLimitMBs;
+ int cpuVcores;
+ switch (monitoringEvent.getType()) {
+ case START_MONITORING_CONTAINER:
+ ContainerStartMonitoringEvent startEvent =
+ (ContainerStartMonitoringEvent) monitoringEvent;
+ usageMetrics.recordStateChangeDurations(
+ startEvent.getLaunchDuration(),
+ startEvent.getLocalizationDuration());
+ cpuVcores = startEvent.getCpuVcores();
+ vmemLimitMBs = (int) (startEvent.getVmemLimit() >> 20);
+ pmemLimitMBs = (int) (startEvent.getPmemLimit() >> 20);
+ usageMetrics.recordResourceLimit(
+ vmemLimitMBs, pmemLimitMBs, cpuVcores);
+ break;
+ case STOP_MONITORING_CONTAINER:
+ usageMetrics.finished();
+ break;
+ case CHANGE_MONITORING_CONTAINER_RESOURCE:
+ ChangeMonitoringContainerResourceEvent changeEvent =
+ (ChangeMonitoringContainerResourceEvent) monitoringEvent;
+ Resource resource = changeEvent.getResource();
+ pmemLimitMBs = resource.getMemory();
+ vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio);
+ cpuVcores = resource.getVirtualCores();
+ usageMetrics.recordResourceLimit(
+ vmemLimitMBs, pmemLimitMBs, cpuVcores);
+ break;
+ default:
+ break;
+ }
+ }
+
@Override
public long getVmemAllocatedForContainers() {
return this.maxVmemAllottedForContainers;
@@ -650,38 +690,53 @@ public class ContainersMonitorImpl extends AbstractService implements
}
@Override
+ @SuppressWarnings("unchecked")
public void handle(ContainersMonitorEvent monitoringEvent) {
-
+ ContainerId containerId = monitoringEvent.getContainerId();
if (!containersMonitorEnabled) {
+ if (monitoringEvent.getType() == ContainersMonitorEventType
+ .CHANGE_MONITORING_CONTAINER_RESOURCE) {
+ // Nothing to enforce. Update container resource immediately.
+ ChangeMonitoringContainerResourceEvent changeEvent =
+ (ChangeMonitoringContainerResourceEvent) monitoringEvent;
+ changeContainerResource(containerId, changeEvent.getResource());
+ }
return;
}
- ContainerId containerId = monitoringEvent.getContainerId();
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;
-
- if (containerMetricsEnabled) {
- ContainerMetrics usageMetrics = ContainerMetrics
- .forContainer(containerId, containerMetricsPeriodMs);
- usageMetrics.recordStateChangeDurations(
- startEvent.getLaunchDuration(),
- startEvent.getLocalizationDuration());
- }
-
- synchronized (this.containersToBeAdded) {
- ProcessTreeInfo processTreeInfo =
- new ProcessTreeInfo(containerId, null, null,
- startEvent.getVmemLimit(), startEvent.getPmemLimit(),
- startEvent.getCpuVcores());
- this.containersToBeAdded.put(containerId, processTreeInfo);
- }
+ LOG.info("Starting resource-monitoring for " + containerId);
+ updateContainerMetrics(monitoringEvent);
+ trackingContainers.put(containerId,
+ new ProcessTreeInfo(containerId, null, null,
+ startEvent.getVmemLimit(), startEvent.getPmemLimit(),
+ startEvent.getCpuVcores()));
break;
case STOP_MONITORING_CONTAINER:
- synchronized (this.containersToBeRemoved) {
- this.containersToBeRemoved.add(containerId);
+ LOG.info("Stopping resource-monitoring for " + containerId);
+ updateContainerMetrics(monitoringEvent);
+ trackingContainers.remove(containerId);
+ break;
+ case CHANGE_MONITORING_CONTAINER_RESOURCE:
+ ChangeMonitoringContainerResourceEvent changeEvent =
+ (ChangeMonitoringContainerResourceEvent) monitoringEvent;
+ ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
+ if (processTreeInfo == null) {
+ LOG.warn("Failed to track container "
+ + containerId.toString()
+ + ". It may have already completed.");
+ break;
}
+ LOG.info("Changing resource-monitoring for " + containerId);
+ updateContainerMetrics(monitoringEvent);
+ long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
+ long vmemLimit = (long) (pmemLimit * vmemRatio);
+ int cpuVcores = changeEvent.getResource().getVirtualCores();
+ processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
+ changeContainerResource(containerId, changeEvent.getResource());
break;
default:
// TODO: Wrong event.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index 9a05278..75bcdae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -211,6 +211,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
super.testIncreaseContainerResourceWithInvalidResource();
}
+ @Override
+ public void testChangeContainerResource() throws Exception {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testChangeContainerResource");
+ super.testChangeContainerResource();
+ }
+
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index e2f12ba..2ea9146 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -1046,6 +1046,102 @@ public class TestContainerManager extends BaseContainerManagerTest {
}
}
+ @Test
+ public void testChangeContainerResource() throws Exception {
+ containerManager.start();
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ // Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ if (Shell.WINDOWS) {
+ fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ fileWriter.write("\numask 0");
+ fileWriter.write("\nexec sleep 100");
+ }
+ fileWriter.close();
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+ List<String> commands =
+ Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+ containerLaunchContext.setCommands(commands);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+ // Make sure the container reaches RUNNING state
+ BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState.RUNNING);
+ // Construct container resource increase request,
+ List<Token> increaseTokens = new ArrayList<Token>();
+ // Add increase request.
+ Resource targetResource = Resource.newInstance(4096, 2);
+ Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user, targetResource,
+ context.getContainerTokenSecretManager(), null);
+ increaseTokens.add(containerToken);
+ IncreaseContainersResourceRequest increaseRequest =
+ IncreaseContainersResourceRequest.newInstance(increaseTokens);
+ IncreaseContainersResourceResponse increaseResponse =
+ containerManager.increaseContainersResource(increaseRequest);
+ Assert.assertEquals(
+ 1, increaseResponse.getSuccessfullyIncreasedContainers().size());
+ Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
+ // Check status
+ List<ContainerId> containerIds = new ArrayList<>();
+ containerIds.add(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ ContainerStatus containerStatus = containerManager
+ .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+ // Check status immediately as resource increase is blocking
+ assertEquals(targetResource, containerStatus.getCapability());
+ // Simulate a decrease request
+ List<org.apache.hadoop.yarn.api.records.Container> containersToDecrease
+ = new ArrayList<>();
+ targetResource = Resource.newInstance(2048, 2);
+ org.apache.hadoop.yarn.api.records.Container decreasedContainer =
+ org.apache.hadoop.yarn.api.records.Container
+ .newInstance(cId, null, null, targetResource, null, null);
+ containersToDecrease.add(decreasedContainer);
+ containerManager.handle(
+ new CMgrDecreaseContainersResourceEvent(containersToDecrease));
+ // Check status with retry
+ containerStatus = containerManager
+ .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+ int retry = 0;
+ while (!targetResource.equals(containerStatus.getCapability()) &&
+ (retry++ < 5)) {
+ Thread.sleep(200);
+ containerStatus = containerManager.getContainerStatuses(gcsRequest)
+ .getContainerStatuses().get(0);
+ }
+ assertEquals(targetResource, containerStatus.getCapability());
+ }
+
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
new file mode 100644
index 0000000..4a18a8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+
+public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
+
+ @Override
+ public long getVirtualMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long getPhysicalMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long getAvailableVirtualMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long getAvailablePhysicalMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public int getNumProcessors() {
+ return 0;
+ }
+
+ @Override
+ public int getNumCores() {
+ return 0;
+ }
+
+ @Override
+ public long getCpuFrequency() {
+ return 0;
+ }
+
+ @Override
+ public long getCumulativeCpuTime() {
+ return 0;
+ }
+
+ @Override
+ public float getCpuUsage() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java
new file mode 100644
index 0000000..c5aaa77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+
+public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree {
+
+ private long rssMemorySize = 0;
+
+ public MockResourceCalculatorProcessTree(String root) {
+ super(root);
+ }
+
+ @Override
+ public void updateProcessTree() {
+ }
+
+ @Override
+ public String getProcessTreeDump() {
+ return "";
+ }
+
+ @Override
+ public long getCumulativeCpuTime() {
+ return 0;
+ }
+
+ @Override
+ public boolean checkPidPgrpidForMatch() {
+ return true;
+ }
+
+ public void setRssMemorySize(long rssMemorySize) {
+ this.rssMemorySize = rssMemorySize;
+ }
+
+ public long getRssMemorySize() {
+ return this.rssMemorySize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d70f938/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
new file mode 100644
index 0000000..d7f89fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class TestContainersMonitorResourceChange {
+
+ private ContainersMonitorImpl containersMonitor;
+ private MockExecutor executor;
+ private Configuration conf;
+ private AsyncDispatcher dispatcher;
+ private Context context;
+ private MockContainerEventHandler containerEventHandler;
+
+ private static class MockExecutor extends ContainerExecutor {
+ @Override
+ public void init() throws IOException {
+ }
+ @Override
+ public void startLocalizer(LocalizerStartContext ctx)
+ throws IOException, InterruptedException {
+ }
+ @Override
+ public int launchContainer(ContainerStartContext ctx) throws
+ IOException {
+ return 0;
+ }
+ @Override
+ public boolean signalContainer(ContainerSignalContext ctx)
+ throws IOException {
+ return true;
+ }
+ @Override
+ public void deleteAsUser(DeletionAsUserContext ctx)
+ throws IOException, InterruptedException {
+ }
+ @Override
+ public String getProcessId(ContainerId containerId) {
+ return String.valueOf(containerId.getContainerId());
+ }
+ @Override
+ public boolean isContainerAlive(ContainerLivenessContext ctx)
+ throws IOException {
+ return true;
+ }
+ }
+
+ private static class MockContainerEventHandler implements
+ EventHandler<ContainerEvent> {
+ final private Set<ContainerId> killedContainer
+ = new HashSet<>();
+ @Override
+ public void handle(ContainerEvent event) {
+ if (event.getType() == ContainerEventType.KILL_CONTAINER) {
+ synchronized (killedContainer) {
+ killedContainer.add(event.getContainerID());
+ }
+ }
+ }
+ public boolean isContainerKilled(ContainerId containerId) {
+ synchronized (killedContainer) {
+ return killedContainer.contains(containerId);
+ }
+ }
+ }
+
+ @Before
+ public void setup() {
+ executor = new MockExecutor();
+ dispatcher = new AsyncDispatcher();
+ context = Mockito.mock(Context.class);
+ Mockito.doReturn(new ConcurrentSkipListMap<ContainerId, Container>())
+ .when(context).getContainers();
+ conf = new Configuration();
+ conf.set(
+ YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+ MockResourceCalculatorPlugin.class.getCanonicalName());
+ conf.set(
+ YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+ MockResourceCalculatorProcessTree.class.getCanonicalName());
+ dispatcher.init(conf);
+ dispatcher.start();
+ containerEventHandler = new MockContainerEventHandler();
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (containersMonitor != null) {
+ containersMonitor.stop();
+ }
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+
+ @Test
+ public void testContainersResourceChange() throws Exception {
+ // set container monitor interval to be 20ms
+ conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L);
+ containersMonitor = createContainersMonitor(executor, dispatcher, context);
+ containersMonitor.init(conf);
+ containersMonitor.start();
+ // create container 1
+ containersMonitor.handle(new ContainerStartMonitoringEvent(
+ getContainerId(1), 2100L, 1000L, 1, 0, 0));
+ // verify that this container is properly tracked
+ assertNotNull(getProcessTreeInfo(getContainerId(1)));
+ assertEquals(1000L, getProcessTreeInfo(getContainerId(1))
+ .getPmemLimit());
+ assertEquals(2100L, getProcessTreeInfo(getContainerId(1))
+ .getVmemLimit());
+ // sleep longer than the monitor interval to make sure resource
+ // enforcement has started
+ Thread.sleep(200);
+ // increase pmem usage, the container should be killed
+ MockResourceCalculatorProcessTree mockTree =
+ (MockResourceCalculatorProcessTree) getProcessTreeInfo(
+ getContainerId(1)).getProcessTree();
+ mockTree.setRssMemorySize(2500L);
+ // verify that this container is killed
+ Thread.sleep(200);
+ assertTrue(containerEventHandler
+ .isContainerKilled(getContainerId(1)));
+ // create container 2
+ containersMonitor.handle(new ContainerStartMonitoringEvent(
+ getContainerId(2), 2202009L, 1048576L, 1, 0, 0));
+ // verify that this container is properly tracked
+ assertNotNull(getProcessTreeInfo(getContainerId(2)));
+ assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
+ .getPmemLimit());
+ assertEquals(2202009L, getProcessTreeInfo(getContainerId(2))
+ .getVmemLimit());
+ // trigger a change resource event, check limit after change
+ containersMonitor.handle(new ChangeMonitoringContainerResourceEvent(
+ getContainerId(2), Resource.newInstance(2, 1)));
+ assertEquals(2097152L, getProcessTreeInfo(getContainerId(2))
+ .getPmemLimit());
+ assertEquals(4404019L, getProcessTreeInfo(getContainerId(2))
+ .getVmemLimit());
+ // sleep longer than the monitor interval to make sure resource
+ // enforcement has started
+ Thread.sleep(200);
+ // increase pmem usage, the container should NOT be killed
+ mockTree =
+ (MockResourceCalculatorProcessTree) getProcessTreeInfo(
+ getContainerId(2)).getProcessTree();
+ mockTree.setRssMemorySize(2000000L);
+ // verify that this container is not killed
+ Thread.sleep(200);
+ assertFalse(containerEventHandler
+ .isContainerKilled(getContainerId(2)));
+ containersMonitor.stop();
+ }
+
+ @Test
+ public void testContainersResourceChangeIsTriggeredImmediately()
+ throws Exception {
+ // set container monitor interval to be 20s
+ conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20000L);
+ containersMonitor = createContainersMonitor(executor, dispatcher, context);
+ containersMonitor.init(conf);
+ containersMonitor.start();
+ // sleep 1 second to make sure the container monitor thread is
+ // now waiting for the next monitor cycle
+ Thread.sleep(1000);
+ // create a container with id 3
+ containersMonitor.handle(new ContainerStartMonitoringEvent(
+ getContainerId(3), 2202009L, 1048576L, 1, 0, 0));
+ // Verify that this container has been tracked
+ assertNotNull(getProcessTreeInfo(getContainerId(3)));
+ // trigger a change resource event, check limit after change
+ containersMonitor.handle(new ChangeMonitoringContainerResourceEvent(
+ getContainerId(3), Resource.newInstance(2, 1)));
+ // verify that this container has been properly tracked with the
+ // correct size
+ assertEquals(2097152L, getProcessTreeInfo(getContainerId(3))
+ .getPmemLimit());
+ assertEquals(4404019L, getProcessTreeInfo(getContainerId(3))
+ .getVmemLimit());
+ containersMonitor.stop();
+ }
+
+ private ContainersMonitorImpl createContainersMonitor(
+ ContainerExecutor containerExecutor, AsyncDispatcher dispatcher,
+ Context context) {
+ return new ContainersMonitorImpl(containerExecutor, dispatcher, context);
+ }
+
+ private ContainerId getContainerId(int id) {
+ return ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(123456L, 1), 1), id);
+ }
+
+ private ProcessTreeInfo getProcessTreeInfo(ContainerId id) {
+ return containersMonitor.trackingContainers.get(id);
+ }
+}