You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/06/26 21:01:53 UTC

[hadoop] branch trunk updated: YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1ac967a  YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri.
1ac967a is described below

commit 1ac967a6b77c262b23e10c6ca68538b7e4ed39b0
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Wed Jun 26 14:01:31 2019 -0700

    YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri.
---
 .../server/nodemanager/NodeStatusUpdaterImpl.java  |   5 +
 .../yarn/server/nodemanager/ResourceView.java      |   8 ++
 .../monitor/ContainersMonitor.java                 |   6 ++
 .../monitor/ContainersMonitorImpl.java             |  49 +++++++---
 .../server/nodemanager/TestNodeStatusUpdater.java  | 103 +++++++++++++++++++++
 5 files changed, 156 insertions(+), 15 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8022a07..181094e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -582,6 +582,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private void updateNMResource(Resource resource) {
     metrics.addResource(Resources.subtract(resource, totalResource));
     this.totalResource = resource;
+
+    // Update the containers monitor
+    ContainersMonitor containersMonitor =
+        this.context.getContainerManager().getContainersMonitor();
+    containersMonitor.setAllocatedResourcesForContainers(totalResource);
   }
 
   // Iterate through the NMContext and clone and get all the containers'
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
index 4fde7b9..02413c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
@@ -20,10 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 public interface ResourceView {
 
+  /**
+   * Get virtual memory allocated to the containers.
+   * @return Virtual memory in bytes.
+   */
   long getVmemAllocatedForContainers();
 
   boolean isVmemCheckEnabled();
 
+  /**
+   * Get physical memory allocated to the containers.
+   * @return Physical memory in bytes.
+   */
   long getPmemAllocatedForContainers();
 
   boolean isPmemCheckEnabled();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index daecc28..002035b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -64,4 +64,10 @@ public interface ContainersMonitor extends Service,
         * containersMonitor.getVmemRatio());
     resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
   }
+
+  /**
+   * Set the allocated resources for containers.
+   * @param resource Resources allocated for the containers.
+   */
+  void setAllocatedResourcesForContainers(Resource resource);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 6d3791e..8792910 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -84,7 +84,9 @@ public class ContainersMonitorImpl extends AbstractService implements
   private static float vmemRatio;
   private Class<? extends ResourceCalculatorProcessTree> processTreeClass;
 
+  /** Maximum virtual memory in bytes. */
   private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
+  /** Maximum physical memory in bytes. */
   private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
 
   private boolean pmemCheckEnabled;
@@ -152,25 +154,23 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     long configuredPMemForContainers =
         NodeManagerHardwareUtils.getContainerMemoryMB(
-            this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L;
+            this.resourceCalculatorPlugin, this.conf);
 
-    long configuredVCoresForContainers =
-        NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin,
-            this.conf);
-
-    // Setting these irrespective of whether checks are enabled. Required in
-    // the UI.
-    // ///////// Physical memory configuration //////
-    this.maxPmemAllottedForContainers = configuredPMemForContainers;
-    this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
+    int configuredVCoresForContainers =
+        NodeManagerHardwareUtils.getVCores(
+            this.resourceCalculatorPlugin, this.conf);
 
     // ///////// Virtual memory configuration //////
     vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
         YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
     Preconditions.checkArgument(vmemRatio > 0.99f,
         YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
-    this.maxVmemAllottedForContainers =
-        (long) (vmemRatio * configuredPMemForContainers);
+
+    // Setting these irrespective of whether checks are enabled.
+    // Required in the UI.
+    Resource resourcesForContainers = Resource.newInstance(
+        configuredPMemForContainers, configuredVCoresForContainers);
+    setAllocatedResourcesForContainers(resourcesForContainers);
 
     pmemCheckEnabled = this.conf.getBoolean(
         YarnConfiguration.NM_PMEM_CHECK_ENABLED,
@@ -908,6 +908,16 @@ public class ContainersMonitorImpl extends AbstractService implements
     return this.maxVCoresAllottedForContainers;
   }
 
+  @Override
+  public void setAllocatedResourcesForContainers(final Resource resource) {
+    LOG.info("Setting the resources allocated to containers to {}", resource);
+    this.maxVCoresAllottedForContainers = resource.getVirtualCores();
+    this.maxPmemAllottedForContainers = convertMBytesToBytes(
+        resource.getMemorySize());
+    this.maxVmemAllottedForContainers =
+        (long) (getVmemRatio() * maxPmemAllottedForContainers);
+  }
+
   /**
    * Is the total virtual memory check enabled?
    *
@@ -973,10 +983,10 @@ public class ContainersMonitorImpl extends AbstractService implements
       }
       LOG.info("Changing resource-monitoring for {}", containerId);
       updateContainerMetrics(monitoringEvent);
-      long pmemLimit =
-          changeEvent.getResource().getMemorySize() * 1024L * 1024L;
+      Resource resource = changeEvent.getResource();
+      long pmemLimit = convertMBytesToBytes(resource.getMemorySize());
       long vmemLimit = (long) (pmemLimit * vmemRatio);
-      int cpuVcores = changeEvent.getResource().getVirtualCores();
+      int cpuVcores = resource.getVirtualCores();
       processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
     }
   }
@@ -999,4 +1009,13 @@ public class ContainersMonitorImpl extends AbstractService implements
             startEvent.getVmemLimit(), startEvent.getPmemLimit(),
             startEvent.getCpuVcores()));
   }
+
+  /**
+   * Convert MegaBytes to Bytes.
+   * @param mb MegaBytes (MB).
+   * @return Bytes representing the input MB.
+   */
+  private static long convertMBytesToBytes(long mb) {
+    return mb * 1024L * 1024L;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 5ba0bef..1b21b93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -57,6 +59,7 @@ import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
@@ -80,6 +83,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
 import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
@@ -96,11 +100,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -118,6 +124,10 @@ import org.apache.hadoop.ipc.Server;
 
 @SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater extends NodeManagerTestBase {
+
+  /** Bytes in a GigaByte. */
+  private static final long GB = 1024L * 1024L * 1024L;
+
   volatile int heartBeatID = 0;
   volatile Throwable nmStartError = null;
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
@@ -1774,6 +1784,99 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
     Assert.assertTrue("Test failed with exception(s)" + exceptions,
         exceptions.isEmpty());
   }
+
+  /**
+   * Test if the {@link NodeManager} updates the resources in the
+   * {@link ContainersMonitor} when the {@link ResourceManager} triggers the
+   * change.
+   * @throws Exception If the test cannot run.
+   */
+  @Test
+  public void testUpdateNMResources() throws Exception {
+
+    // The resource set for the Node Manager from the Resource Tracker
+    final Resource resource = Resource.newInstance(8 * 1024, 1);
+
+    LOG.info("Start the Resource Tracker to mock heartbeats");
+    Server resourceTracker = getMockResourceTracker(resource);
+    resourceTracker.start();
+
+    LOG.info("Start the Node Manager");
+    NodeManager nodeManager = new NodeManager();
+    YarnConfiguration nmConf = new YarnConfiguration();
+    nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        resourceTracker.getListenerAddress());
+    nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
+    nodeManager.init(nmConf);
+    nodeManager.start();
+
+    LOG.info("Initially the Node Manager should have the default resources");
+    ContainerManager containerManager = nodeManager.getContainerManager();
+    ContainersMonitor containerMonitor =
+        containerManager.getContainersMonitor();
+    assertEquals(8, containerMonitor.getVCoresAllocatedForContainers());
+    assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+    LOG.info("The first heartbeat should trigger a resource change to {}",
+        resource);
+    GenericTestUtils.waitFor(
+        () -> containerMonitor.getVCoresAllocatedForContainers() == 1,
+        100, 2 * 1000);
+    assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+    resource.setVirtualCores(5);
+    resource.setMemorySize(4 * 1024);
+    LOG.info("Change the resources to {}", resource);
+    GenericTestUtils.waitFor(
+        () -> containerMonitor.getVCoresAllocatedForContainers() == 5,
+        100, 2 * 1000);
+    assertEquals(4 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+    LOG.info("Cleanup");
+    nodeManager.stop();
+    nodeManager.close();
+    resourceTracker.stop();
+  }
+
+  /**
+   * Create a mock Resource Tracker server that returns the resources we want
+   * in the heartbeat.
+   * @param resource Resource to reply in the heartbeat.
+   * @return RPC server for the Resource Tracker.
+   * @throws Exception If it cannot create the Resource Tracker.
+   */
+  private static Server getMockResourceTracker(final Resource resource)
+      throws Exception {
+
+    // Setup the mock Resource Tracker
+    final ResourceTracker rt = mock(ResourceTracker.class);
+    when(rt.registerNodeManager(any())).thenAnswer(invocation -> {
+      RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+          RegisterNodeManagerResponse.class);
+      response.setContainerTokenMasterKey(createMasterKey());
+      response.setNMTokenMasterKey(createMasterKey());
+      return response;
+    });
+    when(rt.nodeHeartbeat(any())).thenAnswer(invocation -> {
+      NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+          NodeHeartbeatResponse.class);
+      response.setResource(resource);
+      return response;
+    });
+    when(rt.unRegisterNodeManager(any())).thenAnswer(invocaiton -> {
+      UnRegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+          UnRegisterNodeManagerResponse.class);
+      return response;
+    });
+
+    // Get the RPC server
+    YarnConfiguration conf = new YarnConfiguration();
+    YarnRPC rpc = YarnRPC.create(conf);
+    Server server = rpc.getServer(ResourceTracker.class, rt,
+        new InetSocketAddress("0.0.0.0", 0), conf, null, 1);
+    return server;
+  }
+
   // Add new containers info into NM context each time node heart beats.
   private class MyNMContext extends NMContext {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org