You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/03/28 20:32:01 UTC
hadoop git commit: YARN-998. Keep NM resource updated through dynamic
resource config for RM/NM restart. Contributed by Junping Du (cherry picked
from commit c7d843af3b1adde602fe0335f52ffa28b0371bf4)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 d71843558 -> 0c84f9aee
YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du
(cherry picked from commit c7d843af3b1adde602fe0335f52ffa28b0371bf4)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c84f9ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c84f9ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c84f9ae
Branch: refs/heads/branch-2.8
Commit: 0c84f9aee2c408d6d79c12697129307cc8f97f01
Parents: d718435
Author: Jian He <ji...@apache.org>
Authored: Mon Mar 28 11:12:33 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Mon Mar 28 11:28:59 2016 -0700
----------------------------------------------------------------------
.../server/resourcemanager/AdminService.java | 40 +++++-----
.../resourcemanager/ResourceTrackerService.java | 80 ++++++++++++++++----
.../resource/DynamicResourceConfiguration.java | 13 ++--
.../resourcemanager/TestRMAdminService.java | 78 ++++++++++++++++++-
4 files changed, 166 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 80ff3cc..36c6098 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -622,34 +622,32 @@ public class AdminService extends CompositeService implements
try {
Configuration conf = getConfig();
Configuration configuration = new Configuration(conf);
- DynamicResourceConfiguration newconf;
-
- InputStream DRInputStream =
- this.rmContext.getConfigurationProvider()
- .getConfigurationInputStream(configuration,
- YarnConfiguration.DR_CONFIGURATION_FILE);
- if (DRInputStream != null) {
- configuration.addResource(DRInputStream);
- newconf = new DynamicResourceConfiguration(configuration, false);
+ DynamicResourceConfiguration newConf;
+
+ InputStream drInputStream =
+ this.rmContext.getConfigurationProvider().getConfigurationInputStream(
+ configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
+
+ if (drInputStream != null) {
+ newConf = new DynamicResourceConfiguration(configuration,
+ drInputStream);
} else {
- newconf = new DynamicResourceConfiguration(configuration, true);
+ newConf = new DynamicResourceConfiguration(configuration);
}
- if (newconf.getNodes() == null || newconf.getNodes().length == 0) {
- RMAuditLogger.logSuccess(user.getShortUserName(), argName,
- "AdminService");
- return response;
- } else {
+ if (newConf.getNodes() != null && newConf.getNodes().length != 0) {
Map<NodeId, ResourceOption> nodeResourceMap =
- newconf.getNodeResourceMap();
-
+ newConf.getNodeResourceMap();
UpdateNodeResourceRequest updateRequest =
- UpdateNodeResourceRequest.newInstance(nodeResourceMap);
+ UpdateNodeResourceRequest.newInstance(nodeResourceMap);
updateNodeResource(updateRequest);
- RMAuditLogger.logSuccess(user.getShortUserName(), argName,
- "AdminService");
- return response;
}
+ // refresh dynamic resource in ResourceTrackerService
+ this.rmContext.getResourceTrackerService().
+ updateDynamicResourceConfiguration(newConf);
+ RMAuditLogger.logSuccess(user.getShortUserName(), argName,
+ "AdminService");
+ return response;
} catch (IOException ioe) {
throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index bd24b25..f52b5ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -105,6 +107,7 @@ public class ResourceTrackerService extends AbstractService implements
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
+ private volatile DynamicResourceConfiguration drConf;
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
@@ -139,11 +142,11 @@ public class ResourceTrackerService extends AbstractService implements
}
minAllocMb = conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
@@ -156,9 +159,42 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
}
+ loadDynamicResourceConfiguration(conf);
+
super.serviceInit(conf);
}
+ /**
+ * Load DynamicResourceConfiguration from dynamic-resources.xml.
+ * @param conf
+ * @throws IOException
+ */
+ public void loadDynamicResourceConfiguration(Configuration conf)
+ throws IOException {
+ try {
+ // load dynamic-resources.xml
+ InputStream drInputStream = this.rmContext.getConfigurationProvider()
+ .getConfigurationInputStream(conf,
+ YarnConfiguration.DR_CONFIGURATION_FILE);
+ if (drInputStream != null) {
+ this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
+ } else {
+ this.drConf = new DynamicResourceConfiguration(conf);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Update DynamicResourceConfiguration with new configuration.
+ * @param conf
+ */
+ public void updateDynamicResourceConfiguration(
+ DynamicResourceConfiguration conf) {
+ this.drConf = conf;
+ }
+
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
@@ -166,15 +202,14 @@ public class ResourceTrackerService extends AbstractService implements
// security is enabled, so no secretManager.
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
- this.server =
- rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
- conf, null,
- conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
-
+ this.server = rpc.getServer(
+ ResourceTracker.class, this, resourceTrackerAddress, conf, null,
+ conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
+
// Enable service authorization?
if (conf.getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
InputStream inputStream =
this.rmContext.getConfigurationProvider()
@@ -185,12 +220,12 @@ public class ResourceTrackerService extends AbstractService implements
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
-
+
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
- YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
- server.getListenerAddress());
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ server.getListenerAddress());
}
@Override
@@ -290,6 +325,19 @@ public class ResourceTrackerService extends AbstractService implements
return response;
}
+ // check if node's capacity is load from dynamic-resources.xml
+ String[] nodes = this.drConf.getNodes();
+ String nid = nodeId.toString();
+
+ if (nodes != null && Arrays.asList(nodes).contains(nid)) {
+ capability.setMemory(this.drConf.getMemoryPerNode(nid));
+ capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Resource for node: " + nid + " is adjusted to " +
+ capability + " due to settings in dynamic-resources.xml.");
+ }
+ }
+
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
@@ -306,7 +354,7 @@ public class ResourceTrackerService extends AbstractService implements
response.setContainerTokenMasterKey(containerTokenSecretManager
.getCurrentKey());
response.setNMTokenMasterKey(nmTokenSecretManager
- .getCurrentKey());
+ .getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
index dd37801..045c7bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.resource;
+import java.io.InputStream;
+
import java.util.HashMap;
import java.util.Map;
@@ -38,8 +40,6 @@ public class DynamicResourceConfiguration extends Configuration {
private static final Log LOG =
LogFactory.getLog(DynamicResourceConfiguration.class);
- private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml";
-
@Private
public static final String PREFIX = "yarn.resource.dynamic.";
@@ -63,15 +63,14 @@ public class DynamicResourceConfiguration extends Configuration {
}
public DynamicResourceConfiguration(Configuration configuration) {
- this(configuration, true);
+ super(configuration);
+ addResource(YarnConfiguration.DR_CONFIGURATION_FILE);
}
public DynamicResourceConfiguration(Configuration configuration,
- boolean useLocalConfigurationProvider) {
+ InputStream drInputStream) {
super(configuration);
- if (useLocalConfigurationProvider) {
- addResource(DR_CONFIGURATION_FILE);
- }
+ addResource(drInputStream);
}
private String getNodePrefix(String node) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c84f9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index 639b955..4513cbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -27,7 +27,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -202,7 +204,7 @@ public class TestRMAdminService {
}
@Test
- public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider()
+ public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
@@ -240,6 +242,75 @@ public class TestRMAdminService {
}
@Test
+ public void testResourcePersistentForNMRegistrationWithNewResource()
+ throws IOException, YarnException {
+ configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
+
+ //upload default configurations
+ uploadDefaultConfiguration();
+
+ try {
+ rm = new MockRM(configuration);
+ rm.init(configuration);
+ rm.start();
+ rm.registerNode("h1:1234", 5120);
+ } catch(Exception ex) {
+ fail("Should not get any exceptions");
+ }
+
+ NodeId nid = ConverterUtils.toNodeId("h1:1234");
+ RMNode ni = rm.getRMContext().getRMNodes().get(nid);
+ Resource resource = ni.getTotalCapability();
+ Assert.assertEquals("<memory:5120, vCores:5>", resource.toString());
+
+ DynamicResourceConfiguration drConf =
+ new DynamicResourceConfiguration();
+ drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
+ drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
+ drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
+ uploadConfiguration(drConf, "dynamic-resources.xml");
+
+ rm.adminService.refreshNodesResources(
+ RefreshNodesResourcesRequest.newInstance());
+
+ try {
+ // register the same node again with a different resource.
+ // validate this won't work as resource cached in RM side.
+ rm.registerNode("h1:1234", 8192, 8);
+ } catch (Exception ex) {
+ fail("Should not get any exceptions");
+ }
+
+ RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
+ Resource resourceAfter = niAfter.getTotalCapability();
+ Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
+
+ // Replace original dr file with an empty dr file, and validate node
+ // registration with new resources will take effective now.
+ deleteOnRemoteFileSystem("dynamic-resources.xml");
+ DynamicResourceConfiguration emptyDRConf =
+ new DynamicResourceConfiguration();
+
+ uploadConfiguration(emptyDRConf, "dynamic-resources.xml");
+ rm.adminService.refreshNodesResources(
+ RefreshNodesResourcesRequest.newInstance());
+ try {
+ // register the same node third time, this time the register resource
+ // should work.
+ rm.registerNode("h1:1234", 8192, 8);
+ } catch (Exception ex) {
+ fail("Should not get any exceptions");
+ }
+
+ niAfter = rm.getRMContext().getRMNodes().get(nid);
+ resourceAfter = niAfter.getTotalCapability();
+ // new resource in registration should take effective as we empty
+ // dynamic resource file already.
+ Assert.assertEquals("<memory:8192, vCores:8>", resourceAfter.toString());
+ }
+
+ @Test
public void testAdminAclsWithLocalConfigurationProvider() {
rm = new MockRM(configuration);
rm.init(configuration);
@@ -1006,6 +1077,11 @@ public class TestRMAdminService {
uploadToRemoteFileSystem(new Path(csConfFile));
}
+ private void deleteOnRemoteFileSystem(String fileName)
+ throws IOException {
+ fs.delete(new Path(workingPath, fileName));
+ }
+
private void uploadDefaultConfiguration() throws IOException {
Configuration conf = new Configuration();
uploadConfiguration(conf, "core-site.xml");