You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/06/26 21:01:53 UTC
[hadoop] branch trunk updated: YARN-6055. ContainersMonitorImpl
need be adjusted when NM resource changed. Contributed by Inigo Goiri.
This is an automated email from the ASF dual-hosted git repository.
gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1ac967a YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri.
1ac967a is described below
commit 1ac967a6b77c262b23e10c6ca68538b7e4ed39b0
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Wed Jun 26 14:01:31 2019 -0700
YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri.
---
.../server/nodemanager/NodeStatusUpdaterImpl.java | 5 +
.../yarn/server/nodemanager/ResourceView.java | 8 ++
.../monitor/ContainersMonitor.java | 6 ++
.../monitor/ContainersMonitorImpl.java | 49 +++++++---
.../server/nodemanager/TestNodeStatusUpdater.java | 103 +++++++++++++++++++++
5 files changed, 156 insertions(+), 15 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8022a07..181094e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -582,6 +582,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private void updateNMResource(Resource resource) {
metrics.addResource(Resources.subtract(resource, totalResource));
this.totalResource = resource;
+
+ // Update the containers monitor
+ ContainersMonitor containersMonitor =
+ this.context.getContainerManager().getContainersMonitor();
+ containersMonitor.setAllocatedResourcesForContainers(totalResource);
}
// Iterate through the NMContext and clone and get all the containers'
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
index 4fde7b9..02413c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
@@ -20,10 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager;
public interface ResourceView {
+ /**
+ * Get virtual memory allocated to the containers.
+ * @return Virtual memory in bytes.
+ */
long getVmemAllocatedForContainers();
boolean isVmemCheckEnabled();
+ /**
+ * Get physical memory allocated to the containers.
+ * @return Physical memory in bytes.
+ */
long getPmemAllocatedForContainers();
boolean isPmemCheckEnabled();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index daecc28..002035b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -64,4 +64,10 @@ public interface ContainersMonitor extends Service,
* containersMonitor.getVmemRatio());
resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
}
+
+ /**
+ * Set the allocated resources for containers.
+ * @param resource Resources allocated for the containers.
+ */
+ void setAllocatedResourcesForContainers(Resource resource);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 6d3791e..8792910 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -84,7 +84,9 @@ public class ContainersMonitorImpl extends AbstractService implements
private static float vmemRatio;
private Class<? extends ResourceCalculatorProcessTree> processTreeClass;
+ /** Maximum virtual memory in bytes. */
private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
+ /** Maximum physical memory in bytes. */
private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
private boolean pmemCheckEnabled;
@@ -152,25 +154,23 @@ public class ContainersMonitorImpl extends AbstractService implements
long configuredPMemForContainers =
NodeManagerHardwareUtils.getContainerMemoryMB(
- this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L;
+ this.resourceCalculatorPlugin, this.conf);
- long configuredVCoresForContainers =
- NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin,
- this.conf);
-
- // Setting these irrespective of whether checks are enabled. Required in
- // the UI.
- // ///////// Physical memory configuration //////
- this.maxPmemAllottedForContainers = configuredPMemForContainers;
- this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
+ int configuredVCoresForContainers =
+ NodeManagerHardwareUtils.getVCores(
+ this.resourceCalculatorPlugin, this.conf);
// ///////// Virtual memory configuration //////
vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
- this.maxVmemAllottedForContainers =
- (long) (vmemRatio * configuredPMemForContainers);
+
+ // Setting these irrespective of whether checks are enabled.
+ // Required in the UI.
+ Resource resourcesForContainers = Resource.newInstance(
+ configuredPMemForContainers, configuredVCoresForContainers);
+ setAllocatedResourcesForContainers(resourcesForContainers);
pmemCheckEnabled = this.conf.getBoolean(
YarnConfiguration.NM_PMEM_CHECK_ENABLED,
@@ -908,6 +908,16 @@ public class ContainersMonitorImpl extends AbstractService implements
return this.maxVCoresAllottedForContainers;
}
+ @Override
+ public void setAllocatedResourcesForContainers(final Resource resource) {
+ LOG.info("Setting the resources allocated to containers to {}", resource);
+ this.maxVCoresAllottedForContainers = resource.getVirtualCores();
+ this.maxPmemAllottedForContainers = convertMBytesToBytes(
+ resource.getMemorySize());
+ this.maxVmemAllottedForContainers =
+ (long) (getVmemRatio() * maxPmemAllottedForContainers);
+ }
+
/**
* Is the total virtual memory check enabled?
*
@@ -973,10 +983,10 @@ public class ContainersMonitorImpl extends AbstractService implements
}
LOG.info("Changing resource-monitoring for {}", containerId);
updateContainerMetrics(monitoringEvent);
- long pmemLimit =
- changeEvent.getResource().getMemorySize() * 1024L * 1024L;
+ Resource resource = changeEvent.getResource();
+ long pmemLimit = convertMBytesToBytes(resource.getMemorySize());
long vmemLimit = (long) (pmemLimit * vmemRatio);
- int cpuVcores = changeEvent.getResource().getVirtualCores();
+ int cpuVcores = resource.getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
}
}
@@ -999,4 +1009,13 @@ public class ContainersMonitorImpl extends AbstractService implements
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores()));
}
+
+ /**
+ * Convert MegaBytes to Bytes.
+ * @param mb MegaBytes (MB).
+ * @return Bytes representing the input MB.
+ */
+ private static long convertMBytesToBytes(long mb) {
+ return mb * 1024L * 1024L;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 5ba0bef..1b21b93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -57,6 +59,7 @@ import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
@@ -80,6 +83,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
@@ -96,11 +100,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -118,6 +124,10 @@ import org.apache.hadoop.ipc.Server;
@SuppressWarnings("rawtypes")
public class TestNodeStatusUpdater extends NodeManagerTestBase {
+
+ /** Bytes in a GigaByte. */
+ private static final long GB = 1024L * 1024L * 1024L;
+
volatile int heartBeatID = 0;
volatile Throwable nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
@@ -1774,6 +1784,99 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
Assert.assertTrue("Test failed with exception(s)" + exceptions,
exceptions.isEmpty());
}
+
+ /**
+ * Test if the {@link NodeManager} updates the resources in the
+ * {@link ContainersMonitor} when the {@link ResourceManager} triggers the
+ * change.
+ * @throws Exception If the test cannot run.
+ */
+ @Test
+ public void testUpdateNMResources() throws Exception {
+
+ // The resource set for the Node Manager from the Resource Tracker
+ final Resource resource = Resource.newInstance(8 * 1024, 1);
+
+ LOG.info("Start the Resource Tracker to mock heartbeats");
+ Server resourceTracker = getMockResourceTracker(resource);
+ resourceTracker.start();
+
+ LOG.info("Start the Node Manager");
+ NodeManager nodeManager = new NodeManager();
+ YarnConfiguration nmConf = new YarnConfiguration();
+ nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ resourceTracker.getListenerAddress());
+ nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
+ nodeManager.init(nmConf);
+ nodeManager.start();
+
+ LOG.info("Initially the Node Manager should have the default resources");
+ ContainerManager containerManager = nodeManager.getContainerManager();
+ ContainersMonitor containerMonitor =
+ containerManager.getContainersMonitor();
+ assertEquals(8, containerMonitor.getVCoresAllocatedForContainers());
+ assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+ LOG.info("The first heartbeat should trigger a resource change to {}",
+ resource);
+ GenericTestUtils.waitFor(
+ () -> containerMonitor.getVCoresAllocatedForContainers() == 1,
+ 100, 2 * 1000);
+ assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+ resource.setVirtualCores(5);
+ resource.setMemorySize(4 * 1024);
+ LOG.info("Change the resources to {}", resource);
+ GenericTestUtils.waitFor(
+ () -> containerMonitor.getVCoresAllocatedForContainers() == 5,
+ 100, 2 * 1000);
+ assertEquals(4 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+ LOG.info("Cleanup");
+ nodeManager.stop();
+ nodeManager.close();
+ resourceTracker.stop();
+ }
+
+ /**
+ * Create a mock Resource Tracker server that returns the resources we want
+ * in the heartbeat.
+ * @param resource Resource to reply in the heartbeat.
+ * @return RPC server for the Resource Tracker.
+ * @throws Exception If it cannot create the Resource Tracker.
+ */
+ private static Server getMockResourceTracker(final Resource resource)
+ throws Exception {
+
+ // Setup the mock Resource Tracker
+ final ResourceTracker rt = mock(ResourceTracker.class);
+ when(rt.registerNodeManager(any())).thenAnswer(invocation -> {
+ RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+ RegisterNodeManagerResponse.class);
+ response.setContainerTokenMasterKey(createMasterKey());
+ response.setNMTokenMasterKey(createMasterKey());
+ return response;
+ });
+ when(rt.nodeHeartbeat(any())).thenAnswer(invocation -> {
+ NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+ NodeHeartbeatResponse.class);
+ response.setResource(resource);
+ return response;
+ });
+ when(rt.unRegisterNodeManager(any())).thenAnswer(invocaiton -> {
+ UnRegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+ UnRegisterNodeManagerResponse.class);
+ return response;
+ });
+
+ // Get the RPC server
+ YarnConfiguration conf = new YarnConfiguration();
+ YarnRPC rpc = YarnRPC.create(conf);
+ Server server = rpc.getServer(ResourceTracker.class, rt,
+ new InetSocketAddress("0.0.0.0", 0), conf, null, 1);
+ return server;
+ }
+
// Add new containers info into NM context each time node heart beats.
private class MyNMContext extends NMContext {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org