You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/09 17:54:29 UTC

[27/30] hadoop git commit: YARN-2607. Fixed issues in TestDistributedShell. Contributed by Wangda Tan.

YARN-2607. Fixed issues in TestDistributedShell. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/737d9284
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/737d9284
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/737d9284

Branch: refs/heads/HDFS-EC
Commit: 737d9284c109dac20ff423f30c62f6abe2db10f7
Parents: 9a4e0d3
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Sat Nov 8 11:00:57 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Sat Nov 8 11:00:57 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../distributedshell/TestDistributedShell.java  | 140 +---------------
 .../TestDistributedShellWithNodeLabels.java     | 165 +++++++++++++++++++
 3 files changed, 176 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/737d9284/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 748ffe0..9adfb8c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -920,6 +920,8 @@ Release 2.6.0 - UNRELEASED
     YARN-2826. Fixed user-groups mappings' refresh bug caused by YARN-2826.
     (Wangda Tan via vinodkv)
 
+    YARN-2607. Fixed issues in TestDistributedShell. (Wangda Tan via vinodkv)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/737d9284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index eb0fb94..1d3a104 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -30,9 +30,7 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -44,70 +42,37 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableMap;
-
 public class TestDistributedShell {
 
   private static final Log LOG =
       LogFactory.getLog(TestDistributedShell.class);
 
-  protected MiniYARNCluster yarnCluster = null;
-  private int numNodeManager = 1;
-  
-  private YarnConfiguration conf = null;
+  protected MiniYARNCluster yarnCluster = null;  
+  protected YarnConfiguration conf = null;
+  private static final int NUM_NMS = 1;
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
-  
-  private void initializeNodeLabels() throws IOException {
-    RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
-
-    // Setup node labels
-    RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
-    Set<String> labels = new HashSet<String>();
-    labels.add("x");
-    labelsMgr.addToCluserNodeLabels(labels);
-
-    // Setup queue access to node labels
-    conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
-    conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
-        "100");
-    conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
-    conf.set(
-        "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
-        "100");
-
-    rmContext.getScheduler().reinitialize(conf, rmContext);
-
-    // Fetch node-ids from yarn cluster
-    NodeId[] nodeIds = new NodeId[numNodeManager];
-    for (int i = 0; i < numNodeManager; i++) {
-      NodeManager mgr = this.yarnCluster.getNodeManager(i);
-      nodeIds[i] = mgr.getNMContext().getNodeId();
-    }
-
-    // Set label x to NM[1]
-    labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
-  }
 
   @Before
   public void setup() throws Exception {
+    setupInternal(NUM_NMS);
+  }
+
+  protected void setupInternal(int numNodeManager) throws Exception {
+
     LOG.info("Starting up YARN cluster");
     
     conf = new YarnConfiguration();
@@ -115,7 +80,6 @@ public class TestDistributedShell {
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
-    numNodeManager = 2;
     
     if (yarnCluster == null) {
       yarnCluster =
@@ -127,9 +91,6 @@ public class TestDistributedShell {
       
       waitForNMsToRegister();
       
-      // currently only capacity scheduler support node labels,
-      initializeNodeLabels();
-      
       URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
       if (url == null) {
         throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
@@ -807,7 +768,7 @@ public class TestDistributedShell {
     int sec = 60;
     while (sec >= 0) {
       if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() 
-          >= numNodeManager) {
+          >= NUM_NMS) {
         break;
       }
       Thread.sleep(1000);
@@ -940,88 +901,5 @@ public class TestDistributedShell {
     }
     return numOfWords;
   }
-  
-  @Test(timeout=90000)
-  public void testDSShellWithNodeLabelExpression() throws Exception {
-    // Start NMContainerMonitor
-    NMContainerMonitor mon = new NMContainerMonitor();
-    Thread t = new Thread(mon);
-    t.start();
-
-    // Submit a job which will sleep for 60 sec
-    String[] args = {
-        "--jar",
-        APPMASTER_JAR,
-        "--num_containers",
-        "4",
-        "--shell_command",
-        "sleep",
-        "--shell_args",
-        "15",
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1",
-        "--node_label_expression",
-        "x"
-    };
-
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-    
-    t.interrupt();
-    
-    // Check maximum number of containers on each NMs
-    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
-    // Check no container allocated on NM[0]
-    Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
-    // Check there're some containers allocated on NM[1]
-    Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
-  }
-  
-  /**
-   * Monitor containers running on NMs
-   */
-  private class NMContainerMonitor implements Runnable {
-    // The interval of milliseconds of sampling (500ms)
-    final static int SAMPLING_INTERVAL_MS = 500;
-    
-    // The maximum number of containers running on each NMs
-    int[] maxRunningContainersOnNMs = new int[numNodeManager];
-
-    @Override
-    public void run() {
-      while (true) {
-        for (int i = 0; i < numNodeManager; i++) {
-          int nContainers =
-              yarnCluster.getNodeManager(i).getNMContext().getContainers()
-                  .size();
-          if (nContainers > maxRunningContainersOnNMs[i]) {
-            maxRunningContainersOnNMs[i] = nContainers;
-          }
-        }
-        try {
-          Thread.sleep(SAMPLING_INTERVAL_MS);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-          break;
-        }
-      }
-    }
-    
-    public int[] getMaxRunningContainersReport() {
-      return maxRunningContainersOnNMs;
-    }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/737d9284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
new file mode 100644
index 0000000..c04b7fe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestDistributedShellWithNodeLabels {
+  private static final Log LOG =
+      LogFactory.getLog(TestDistributedShellWithNodeLabels.class);
+  
+  static final int NUM_NMS = 2;
+  TestDistributedShell distShellTest;
+ 
+  @Before
+  public void setup() throws Exception {
+    distShellTest = new TestDistributedShell();
+    distShellTest.setupInternal(NUM_NMS);
+  }
+  
+  private void initializeNodeLabels() throws IOException {
+    RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
+
+    // Setup node labels
+    RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
+    Set<String> labels = new HashSet<String>();
+    labels.add("x");
+    labelsMgr.addToCluserNodeLabels(labels);
+
+    // Setup queue access to node labels
+    distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
+    distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
+        "100");
+    distShellTest.conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
+    distShellTest.conf.set(
+        "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
+        "100");
+
+    rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext);
+
+    // Fetch node-ids from yarn cluster
+    NodeId[] nodeIds = new NodeId[NUM_NMS];
+    for (int i = 0; i < NUM_NMS; i++) {
+      NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i);
+      nodeIds[i] = mgr.getNMContext().getNodeId();
+    }
+
+    // Set label x to NM[1]
+    labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
+  }
+  
+  @Test(timeout=90000)
+  public void testDSShellWithNodeLabelExpression() throws Exception {
+    initializeNodeLabels();
+    
+    // Start NMContainerMonitor
+    NMContainerMonitor mon = new NMContainerMonitor();
+    Thread t = new Thread(mon);
+    t.start();
+
+    // Submit a job which will sleep for 60 sec
+    String[] args = {
+        "--jar",
+        TestDistributedShell.APPMASTER_JAR,
+        "--num_containers",
+        "4",
+        "--shell_command",
+        "sleep",
+        "--shell_args",
+        "15",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--node_label_expression",
+        "x"
+    };
+
+    LOG.info("Initializing DS Client");
+    final Client client =
+        new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
+    boolean initSuccess = client.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+    LOG.info("Client run completed. Result=" + result);
+    
+    t.interrupt();
+    
+    // Check maximum number of containers on each NMs
+    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
+    // Check no container allocated on NM[0]
+    Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
+    // Check there're some containers allocated on NM[1]
+    Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+  }
+  
+  /**
+   * Monitor containers running on NMs
+   */
+  class NMContainerMonitor implements Runnable {
+    // The interval of milliseconds of sampling (500ms)
+    final static int SAMPLING_INTERVAL_MS = 500;
+
+    // The maximum number of containers running on each NMs
+    int[] maxRunningContainersOnNMs = new int[NUM_NMS];
+
+    @Override
+    public void run() {
+      while (true) {
+        for (int i = 0; i < NUM_NMS; i++) {
+          int nContainers =
+              distShellTest.yarnCluster.getNodeManager(i).getNMContext()
+                  .getContainers().size();
+          if (nContainers > maxRunningContainersOnNMs[i]) {
+            maxRunningContainersOnNMs[i] = nContainers;
+          }
+        }
+        try {
+          Thread.sleep(SAMPLING_INTERVAL_MS);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+
+    public int[] getMaxRunningContainersReport() {
+      return maxRunningContainersOnNMs;
+    }
+  }
+}