You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/28 04:14:05 UTC
git commit: YARN-2502. Changed DistributedShell to support node
labels. Contributed by Wangda Tan (cherry picked from commit
f6b963fdfc517429149165e4bb6fb947be6e3c99)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 296c4064f -> a1c7a707c
YARN-2502. Changed DistributedShell to support node labels. Contributed by Wangda Tan
(cherry picked from commit f6b963fdfc517429149165e4bb6fb947be6e3c99)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1c7a707
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1c7a707
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1c7a707
Branch: refs/heads/branch-2
Commit: a1c7a707cac478b7288dfc34cd2fccecb68d0c35
Parents: 296c406
Author: Jian He <ji...@apache.org>
Authored: Mon Oct 27 20:13:00 2014 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Oct 27 20:13:53 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../applications/distributedshell/Client.java | 16 +-
.../distributedshell/TestDistributedShell.java | 163 +++++++++++++++++--
3 files changed, 163 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1c7a707/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b1bd6eb..9c08db7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -374,6 +374,9 @@ Release 2.6.0 - UNRELEASED
sake of localization and log-aggregation for long-running services. (Jian He
via vinodkv)
+ YARN-2502. Changed DistributedShell to support node labels. (Wangda Tan via
+ jianhe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1c7a707/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 2067aca..0e9a4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -115,7 +115,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
public class Client {
private static final Log LOG = LogFactory.getLog(Client.class);
-
+
// Configuration
private Configuration conf;
private YarnClient yarnClient;
@@ -152,6 +152,7 @@ public class Client {
private int containerVirtualCores = 1;
// No. of containers in which the shell script needs to be executed
private int numContainers = 1;
+ private String nodeLabelExpression = null;
// log4j.properties file
// if available, add to local resources and set into classpath
@@ -280,7 +281,12 @@ public class Client {
opts.addOption("create", false, "Flag to indicate whether to create the "
+ "domain specified with -domain.");
opts.addOption("help", false, "Print usage");
-
+ opts.addOption("node_label_expression", true,
+ "Node label expression to determine the nodes"
+ + " where all the containers of this application"
+ + " will be allocated, \"\" means containers"
+ + " can be allocated anywhere, if you don't specify the option,"
+ + " default node_label_expression of queue will be used.");
}
/**
@@ -391,6 +397,7 @@ public class Client {
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+
if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
@@ -399,6 +406,8 @@ public class Client {
+ ", containerVirtualCores=" + containerVirtualCores
+ ", numContainer=" + numContainers);
}
+
+ nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
@@ -617,6 +626,9 @@ public class Client {
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
+ if (null != nodeLabelExpression) {
+ appContext.setNodeLabelExpression(nodeLabelExpression);
+ }
vargs.add("--priority " + String.valueOf(shellCmdPriority));
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1c7a707/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 2414d4d..0ded5bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -30,7 +30,9 @@ import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -42,6 +44,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -49,40 +52,81 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableMap;
+
public class TestDistributedShell {
private static final Log LOG =
LogFactory.getLog(TestDistributedShell.class);
protected MiniYARNCluster yarnCluster = null;
- protected Configuration conf = new YarnConfiguration();
+ private int numNodeManager = 1;
+
+ private YarnConfiguration conf = null;
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
+
+ private void initializeNodeLabels() throws IOException {
+ RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
+
+ // Setup node labels
+ RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
+ Set<String> labels = new HashSet<String>();
+ labels.add("x");
+ labelsMgr.addToCluserNodeLabels(labels);
+
+ // Setup queue access to node labels
+ conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
+ conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
+ conf.set(
+ "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
+ "100");
+
+ rmContext.getScheduler().reinitialize(conf, rmContext);
+
+ // Fetch node-ids from yarn cluster
+ NodeId[] nodeIds = new NodeId[numNodeManager];
+ for (int i = 0; i < numNodeManager; i++) {
+ NodeManager mgr = this.yarnCluster.getNodeManager(i);
+ nodeIds[i] = mgr.getNMContext().getNodeId();
+ }
+
+ // Set label x to NM[1]
+ labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
+ }
@Before
public void setup() throws Exception {
LOG.info("Starting up YARN cluster");
+
+ conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
- conf.setClass(YarnConfiguration.RM_SCHEDULER,
- FifoScheduler.class, ResourceScheduler.class);
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
+ numNodeManager = 2;
+
if (yarnCluster == null) {
- yarnCluster = new MiniYARNCluster(
- TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true);
+ yarnCluster =
+ new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
+ numNodeManager, 1, 1, true);
yarnCluster.init(conf);
+
yarnCluster.start();
- NodeManager nm = yarnCluster.getNodeManager(0);
- waitForNMToRegister(nm);
+
+ waitForNMsToRegister();
+
+ // currently only capacity scheduler support node labels,
+ initializeNodeLabels();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
@@ -757,13 +801,15 @@ public class TestDistributedShell {
}
}
- protected static void waitForNMToRegister(NodeManager nm)
- throws Exception {
- int attempt = 60;
- ContainerManagerImpl cm =
- ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
- while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) {
- Thread.sleep(2000);
+ protected void waitForNMsToRegister() throws Exception {
+ int sec = 60;
+ while (sec >= 0) {
+ if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
+ >= numNodeManager) {
+ break;
+ }
+ Thread.sleep(1000);
+ sec--;
}
}
@@ -892,5 +938,88 @@ public class TestDistributedShell {
}
return numOfWords;
}
+
+ @Test(timeout=90000)
+ public void testDSShellWithNodeLabelExpression() throws Exception {
+ // Start NMContainerMonitor
+ NMContainerMonitor mon = new NMContainerMonitor();
+ Thread t = new Thread(mon);
+ t.start();
+
+ // Submit a job which will sleep for 60 sec
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "4",
+ "--shell_command",
+ "sleep",
+ "--shell_args",
+ "15",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--node_label_expression",
+ "x"
+ };
+
+ LOG.info("Initializing DS Client");
+ final Client client =
+ new Client(new Configuration(yarnCluster.getConfig()));
+ boolean initSuccess = client.init(args);
+ Assert.assertTrue(initSuccess);
+ LOG.info("Running DS Client");
+ boolean result = client.run();
+ LOG.info("Client run completed. Result=" + result);
+
+ t.interrupt();
+
+ // Check maximum number of containers on each NMs
+ int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
+ // Check no container allocated on NM[0]
+ Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
+ // Check there're some containers allocated on NM[1]
+ Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+ }
+
+ /**
+ * Monitor containers running on NMs
+ */
+ private class NMContainerMonitor implements Runnable {
+ // The interval of milliseconds of sampling (500ms)
+ final static int SAMPLING_INTERVAL_MS = 500;
+
+ // The maximum number of containers running on each NMs
+ int[] maxRunningContainersOnNMs = new int[numNodeManager];
+
+ @Override
+ public void run() {
+ while (true) {
+ for (int i = 0; i < numNodeManager; i++) {
+ int nContainers =
+ yarnCluster.getNodeManager(i).getNMContext().getContainers()
+ .size();
+ if (nContainers > maxRunningContainersOnNMs[i]) {
+ maxRunningContainersOnNMs[i] = nContainers;
+ }
+ }
+ try {
+ Thread.sleep(SAMPLING_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ break;
+ }
+ }
+ }
+
+ public int[] getMaxRunningContainersReport() {
+ return maxRunningContainersOnNMs;
+ }
+ }
}