You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/28 04:13:35 UTC

git commit: YARN-2502. Changed DistributedShell to support node labels. Contributed by Wangda Tan

Repository: hadoop
Updated Branches:
  refs/heads/trunk b0e19c9d5 -> f6b963fdf


YARN-2502. Changed DistributedShell to support node labels. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6b963fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6b963fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6b963fd

Branch: refs/heads/trunk
Commit: f6b963fdfc517429149165e4bb6fb947be6e3c99
Parents: b0e19c9
Author: Jian He <ji...@apache.org>
Authored: Mon Oct 27 20:13:00 2014 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Oct 27 20:13:00 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../applications/distributedshell/Client.java   |  16 +-
 .../distributedshell/TestDistributedShell.java  | 163 +++++++++++++++++--
 3 files changed, 163 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b963fd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b32132b..b1d25d0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -404,6 +404,9 @@ Release 2.6.0 - UNRELEASED
     sake of localization and log-aggregation for long-running services. (Jian He
     via vinodkv)
 
+    YARN-2502. Changed DistributedShell to support node labels. (Wangda Tan via
+    jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b963fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 2067aca..0e9a4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -115,7 +115,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 public class Client {
 
   private static final Log LOG = LogFactory.getLog(Client.class);
-
+  
   // Configuration
   private Configuration conf;
   private YarnClient yarnClient;
@@ -152,6 +152,7 @@ public class Client {
   private int containerVirtualCores = 1;
   // No. of containers in which the shell script needs to be executed
   private int numContainers = 1;
+  private String nodeLabelExpression = null;
 
   // log4j.properties file 
   // if available, add to local resources and set into classpath 
@@ -280,7 +281,12 @@ public class Client {
     opts.addOption("create", false, "Flag to indicate whether to create the "
         + "domain specified with -domain.");
     opts.addOption("help", false, "Print usage");
-
+    opts.addOption("node_label_expression", true,
+        "Node label expression to determine the nodes"
+            + " where all the containers of this application"
+            + " will be allocated, \"\" means containers"
+            + " can be allocated anywhere, if you don't specify the option,"
+            + " default node_label_expression of queue will be used.");
   }
 
   /**
@@ -391,6 +397,7 @@ public class Client {
     containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
     containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
     numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+    
 
     if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
       throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
@@ -399,6 +406,8 @@ public class Client {
           + ", containerVirtualCores=" + containerVirtualCores
           + ", numContainer=" + numContainers);
     }
+    
+    nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
 
     clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
 
@@ -617,6 +626,9 @@ public class Client {
     vargs.add("--container_memory " + String.valueOf(containerMemory));
     vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
     vargs.add("--num_containers " + String.valueOf(numContainers));
+    if (null != nodeLabelExpression) {
+      appContext.setNodeLabelExpression(nodeLabelExpression);
+    }
     vargs.add("--priority " + String.valueOf(shellCmdPriority));
 
     for (Map.Entry<String, String> entry : shellEnv.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b963fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 2414d4d..0ded5bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -30,7 +30,9 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +44,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -49,40 +52,81 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestDistributedShell {
 
   private static final Log LOG =
       LogFactory.getLog(TestDistributedShell.class);
 
   protected MiniYARNCluster yarnCluster = null;
-  protected Configuration conf = new YarnConfiguration();
+  private int numNodeManager = 1;
+  
+  private YarnConfiguration conf = null;
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
+  
+  private void initializeNodeLabels() throws IOException {
+    RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
+
+    // Setup node labels
+    RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
+    Set<String> labels = new HashSet<String>();
+    labels.add("x");
+    labelsMgr.addToCluserNodeLabels(labels);
+
+    // Setup queue access to node labels
+    conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
+    conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
+    conf.set(
+        "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
+        "100");
+
+    rmContext.getScheduler().reinitialize(conf, rmContext);
+
+    // Fetch node-ids from yarn cluster
+    NodeId[] nodeIds = new NodeId[numNodeManager];
+    for (int i = 0; i < numNodeManager; i++) {
+      NodeManager mgr = this.yarnCluster.getNodeManager(i);
+      nodeIds[i] = mgr.getNMContext().getNodeId();
+    }
+
+    // Set label x to NM[1]
+    labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
+  }
 
   @Before
   public void setup() throws Exception {
     LOG.info("Starting up YARN cluster");
+    
+    conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, 
-        FifoScheduler.class, ResourceScheduler.class);
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
+    numNodeManager = 2;
+    
     if (yarnCluster == null) {
-      yarnCluster = new MiniYARNCluster(
-        TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true);
+      yarnCluster =
+          new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
+              numNodeManager, 1, 1, true);
       yarnCluster.init(conf);
+      
       yarnCluster.start();
-      NodeManager  nm = yarnCluster.getNodeManager(0);
-      waitForNMToRegister(nm);
+      
+      waitForNMsToRegister();
+      
+      // currently only capacity scheduler support node labels,
+      initializeNodeLabels();
       
       URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
       if (url == null) {
@@ -757,13 +801,15 @@ public class TestDistributedShell {
     }
   }
 
-  protected static void waitForNMToRegister(NodeManager nm)
-      throws Exception {
-    int attempt = 60;
-    ContainerManagerImpl cm =
-        ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
-    while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) {
-      Thread.sleep(2000);
+  protected void waitForNMsToRegister() throws Exception {
+    int sec = 60;
+    while (sec >= 0) {
+      if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() 
+          >= numNodeManager) {
+        break;
+      }
+      Thread.sleep(1000);
+      sec--;
     }
   }
 
@@ -892,5 +938,88 @@ public class TestDistributedShell {
     }
     return numOfWords;
   }
+  
+  @Test(timeout=90000)
+  public void testDSShellWithNodeLabelExpression() throws Exception {
+    // Start NMContainerMonitor
+    NMContainerMonitor mon = new NMContainerMonitor();
+    Thread t = new Thread(mon);
+    t.start();
+
+    // Submit a job which will sleep for 60 sec
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "4",
+        "--shell_command",
+        "sleep",
+        "--shell_args",
+        "15",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--node_label_expression",
+        "x"
+    };
+
+    LOG.info("Initializing DS Client");
+    final Client client =
+        new Client(new Configuration(yarnCluster.getConfig()));
+    boolean initSuccess = client.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+    LOG.info("Client run completed. Result=" + result);
+    
+    t.interrupt();
+    
+    // Check maximum number of containers on each NMs
+    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
+    // Check no container allocated on NM[0]
+    Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
+    // Check there're some containers allocated on NM[1]
+    Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+  }
+  
+  /**
+   * Monitor containers running on NMs
+   */
+  private class NMContainerMonitor implements Runnable {
+    // The interval of milliseconds of sampling (500ms)
+    final static int SAMPLING_INTERVAL_MS = 500;
+    
+    // The maximum number of containers running on each NMs
+    int[] maxRunningContainersOnNMs = new int[numNodeManager];
+
+    @Override
+    public void run() {
+      while (true) {
+        for (int i = 0; i < numNodeManager; i++) {
+          int nContainers =
+              yarnCluster.getNodeManager(i).getNMContext().getContainers()
+                  .size();
+          if (nContainers > maxRunningContainersOnNMs[i]) {
+            maxRunningContainersOnNMs[i] = nContainers;
+          }
+        }
+        try {
+          Thread.sleep(SAMPLING_INTERVAL_MS);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+    
+    public int[] getMaxRunningContainersReport() {
+      return maxRunningContainersOnNMs;
+    }
+  }
 }