You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/03/28 20:32:01 UTC

hadoop git commit: YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du (cherry picked from commit c7d843af3b1adde602fe0335f52ffa28b0371bf4)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 d71843558 -> 0c84f9aee


YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du
(cherry picked from commit c7d843af3b1adde602fe0335f52ffa28b0371bf4)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c84f9ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c84f9ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c84f9ae

Branch: refs/heads/branch-2.8
Commit: 0c84f9aee2c408d6d79c12697129307cc8f97f01
Parents: d718435
Author: Jian He <ji...@apache.org>
Authored: Mon Mar 28 11:12:33 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Mon Mar 28 11:28:59 2016 -0700

----------------------------------------------------------------------
 .../server/resourcemanager/AdminService.java    | 40 +++++-----
 .../resourcemanager/ResourceTrackerService.java | 80 ++++++++++++++++----
 .../resource/DynamicResourceConfiguration.java  | 13 ++--
 .../resourcemanager/TestRMAdminService.java     | 78 ++++++++++++++++++-
 4 files changed, 166 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 80ff3cc..36c6098 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -622,34 +622,32 @@ public class AdminService extends CompositeService implements
     try {
       Configuration conf = getConfig();
       Configuration configuration = new Configuration(conf);
-      DynamicResourceConfiguration newconf;
-
-      InputStream DRInputStream =
-        this.rmContext.getConfigurationProvider()
-        .getConfigurationInputStream(configuration,
-          YarnConfiguration.DR_CONFIGURATION_FILE);
-      if (DRInputStream != null) {
-        configuration.addResource(DRInputStream);
-        newconf = new DynamicResourceConfiguration(configuration, false);
+      DynamicResourceConfiguration newConf;
+
+      InputStream drInputStream =
+          this.rmContext.getConfigurationProvider().getConfigurationInputStream(
+              configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
+
+      if (drInputStream != null) {
+        newConf = new DynamicResourceConfiguration(configuration,
+            drInputStream);
       } else {
-        newconf = new DynamicResourceConfiguration(configuration, true);
+        newConf = new DynamicResourceConfiguration(configuration);
       }
 
-      if (newconf.getNodes() == null || newconf.getNodes().length == 0) {
-        RMAuditLogger.logSuccess(user.getShortUserName(), argName,
-            "AdminService");
-        return response;
-      } else {
+      if (newConf.getNodes() != null && newConf.getNodes().length != 0) {
         Map<NodeId, ResourceOption> nodeResourceMap =
-          newconf.getNodeResourceMap();
-
+            newConf.getNodeResourceMap();
         UpdateNodeResourceRequest updateRequest =
-          UpdateNodeResourceRequest.newInstance(nodeResourceMap);
+            UpdateNodeResourceRequest.newInstance(nodeResourceMap);
         updateNodeResource(updateRequest);
-        RMAuditLogger.logSuccess(user.getShortUserName(), argName,
-          "AdminService");
-        return response;
       }
+      // refresh dynamic resource in ResourceTrackerService
+      this.rmContext.getResourceTrackerService().
+          updateDynamicResourceConfiguration(newConf);
+      RMAuditLogger.logSuccess(user.getShortUserName(), argName,
+              "AdminService");
+      return response;
     } catch (IOException ioe) {
       throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index bd24b25..f52b5ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -105,6 +107,7 @@ public class ResourceTrackerService extends AbstractService implements
 
   private boolean isDistributedNodeLabelsConf;
   private boolean isDelegatedCentralizedNodeLabelsConf;
+  private volatile DynamicResourceConfiguration drConf;
 
   public ResourceTrackerService(RMContext rmContext,
       NodesListManager nodesListManager,
@@ -139,11 +142,11 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     minAllocMb = conf.getInt(
-    	YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-    	YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
     minAllocVcores = conf.getInt(
-    	YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-    	YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
 
     minimumNodeManagerVersion = conf.get(
         YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
@@ -156,9 +159,42 @@ public class ResourceTrackerService extends AbstractService implements
           YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
     }
 
+    loadDynamicResourceConfiguration(conf);
+
     super.serviceInit(conf);
   }
 
+  /**
+   * Load DynamicResourceConfiguration from dynamic-resources.xml.
+   * @param conf
+   * @throws IOException
+   */
+  public void loadDynamicResourceConfiguration(Configuration conf)
+      throws IOException {
+    try {
+      // load dynamic-resources.xml
+      InputStream drInputStream = this.rmContext.getConfigurationProvider()
+          .getConfigurationInputStream(conf,
+          YarnConfiguration.DR_CONFIGURATION_FILE);
+      if (drInputStream != null) {
+        this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
+      } else {
+        this.drConf = new DynamicResourceConfiguration(conf);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Update DynamicResourceConfiguration with new configuration.
+   * @param conf
+   */
+  public void updateDynamicResourceConfiguration(
+      DynamicResourceConfiguration conf) {
+    this.drConf = conf;
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     super.serviceStart();
@@ -166,15 +202,14 @@ public class ResourceTrackerService extends AbstractService implements
     // security is enabled, so no secretManager.
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
-    this.server =
-      rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
-          conf, null,
-          conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, 
-              YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
-    
+    this.server = rpc.getServer(
+        ResourceTracker.class, this, resourceTrackerAddress, conf, null,
+        conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
+
     // Enable service authorization?
     if (conf.getBoolean(
-        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
         false)) {
       InputStream inputStream =
           this.rmContext.getConfigurationProvider()
@@ -185,12 +220,12 @@ public class ResourceTrackerService extends AbstractService implements
       }
       refreshServiceAcls(conf, RMPolicyProvider.getInstance());
     }
- 
+
     this.server.start();
     conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
-			   YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
-			   YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
-                           server.getListenerAddress());
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        server.getListenerAddress());
   }
 
   @Override
@@ -290,6 +325,19 @@ public class ResourceTrackerService extends AbstractService implements
       return response;
     }
 
+    // check if node's capacity is load from dynamic-resources.xml
+    String[] nodes = this.drConf.getNodes();
+    String nid = nodeId.toString();
+
+    if (nodes != null && Arrays.asList(nodes).contains(nid)) {
+      capability.setMemory(this.drConf.getMemoryPerNode(nid));
+      capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Resource for node: " + nid + " is adjusted to " +
+            capability + " due to settings in dynamic-resources.xml.");
+      }
+    }
+
     // Check if this node has minimum allocations
     if (capability.getMemory() < minAllocMb
         || capability.getVirtualCores() < minAllocVcores) {
@@ -306,7 +354,7 @@ public class ResourceTrackerService extends AbstractService implements
     response.setContainerTokenMasterKey(containerTokenSecretManager
         .getCurrentKey());
     response.setNMTokenMasterKey(nmTokenSecretManager
-        .getCurrentKey());    
+        .getCurrentKey());
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
         resolve(host), capability, nodeManagerVersion);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
index dd37801..045c7bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resource;
 
+import java.io.InputStream;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -38,8 +40,6 @@ public class DynamicResourceConfiguration extends Configuration {
   private static final Log LOG =
     LogFactory.getLog(DynamicResourceConfiguration.class);
 
-  private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml";
-
   @Private
   public static final String PREFIX = "yarn.resource.dynamic.";
 
@@ -63,15 +63,14 @@ public class DynamicResourceConfiguration extends Configuration {
   }
 
   public DynamicResourceConfiguration(Configuration configuration) {
-    this(configuration, true);
+    super(configuration);
+    addResource(YarnConfiguration.DR_CONFIGURATION_FILE);
   }
 
   public DynamicResourceConfiguration(Configuration configuration,
-      boolean useLocalConfigurationProvider) {
+      InputStream drInputStream) {
     super(configuration);
-    if (useLocalConfigurationProvider) {
-      addResource(DR_CONFIGURATION_FILE);
-    }
+    addResource(drInputStream);
   }
 
   private String getNodePrefix(String node) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index 639b955..4513cbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -27,7 +27,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -202,7 +204,7 @@ public class TestRMAdminService {
   }
 
   @Test
-  public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider()
+  public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider()
       throws IOException, YarnException {
     configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
         "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
@@ -240,6 +242,75 @@ public class TestRMAdminService {
   }
 
   @Test
+  public void testResourcePersistentForNMRegistrationWithNewResource()
+      throws IOException, YarnException {
+    configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
+
+    //upload default configurations
+    uploadDefaultConfiguration();
+
+    try {
+      rm = new MockRM(configuration);
+      rm.init(configuration);
+      rm.start();
+      rm.registerNode("h1:1234", 5120);
+    } catch(Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    NodeId nid = ConverterUtils.toNodeId("h1:1234");
+    RMNode ni = rm.getRMContext().getRMNodes().get(nid);
+    Resource resource = ni.getTotalCapability();
+    Assert.assertEquals("<memory:5120, vCores:5>", resource.toString());
+
+    DynamicResourceConfiguration drConf =
+        new DynamicResourceConfiguration();
+    drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
+    drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
+    drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
+    uploadConfiguration(drConf, "dynamic-resources.xml");
+
+    rm.adminService.refreshNodesResources(
+        RefreshNodesResourcesRequest.newInstance());
+
+    try {
+      // register the same node again with a different resource.
+      // validate this won't work as resource cached in RM side.
+      rm.registerNode("h1:1234", 8192, 8);
+    } catch (Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
+    Resource resourceAfter = niAfter.getTotalCapability();
+    Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
+
+    // Replace original dr file with an empty dr file, and validate node
+    // registration with new resources will take effective now.
+    deleteOnRemoteFileSystem("dynamic-resources.xml");
+    DynamicResourceConfiguration emptyDRConf =
+        new DynamicResourceConfiguration();
+
+    uploadConfiguration(emptyDRConf, "dynamic-resources.xml");
+    rm.adminService.refreshNodesResources(
+        RefreshNodesResourcesRequest.newInstance());
+    try {
+      // register the same node third time, this time the register resource
+      // should work.
+      rm.registerNode("h1:1234", 8192, 8);
+    } catch (Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    niAfter = rm.getRMContext().getRMNodes().get(nid);
+    resourceAfter = niAfter.getTotalCapability();
+    // new resource in registration should take effective as we empty
+    // dynamic resource file already.
+    Assert.assertEquals("<memory:8192, vCores:8>", resourceAfter.toString());
+  }
+
+  @Test
   public void testAdminAclsWithLocalConfigurationProvider() {
     rm = new MockRM(configuration);
     rm.init(configuration);
@@ -1006,6 +1077,11 @@ public class TestRMAdminService {
     uploadToRemoteFileSystem(new Path(csConfFile));
   }
 
+  private void deleteOnRemoteFileSystem(String fileName)
+      throws IOException {
+    fs.delete(new Path(workingPath, fileName));
+  }
+
   private void uploadDefaultConfiguration() throws IOException {
     Configuration conf = new Configuration();
     uploadConfiguration(conf, "core-site.xml");