You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/21 04:47:02 UTC

svn commit: r1544023 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yarn-applicatio...

Author: vinodkv
Date: Thu Nov 21 03:47:02 2013
New Revision: 1544023

URL: http://svn.apache.org/r1544023
Log:
YARN-1303. Fixed DistributedShell to not fail with multiple commands separated by a semi-colon as shell-command. Contributed by Xuan Gong.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1544023&r1=1544022&r2=1544023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Nov 21 03:47:02 2013
@@ -114,6 +114,9 @@ Release 2.3.0 - UNRELEASED
     YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit
     Daga via Sandy Ryza)
 
+    YARN-1303. Fixed DistributedShell to not fail with multiple commands separated
+    by a semi-colon as shell-command. (Xuan Gong via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1544023&r1=1544022&r2=1544023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Thu Nov 21 03:47:02 2013
@@ -19,8 +19,9 @@
 package org.apache.hadoop.yarn.applications.distributedshell;
 
 import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -376,7 +377,16 @@ public class ApplicationMaster {
       throw new IllegalArgumentException(
           "No shell command specified to be executed by application master");
     }
-    shellCommand = cliParser.getOptionValue("shell_command");
+    String shellCommandPath = cliParser.getOptionValue("shell_command");
+    FileInputStream fs = null;
+    DataInputStream ds = null;
+    try {
+      ds = new DataInputStream(new FileInputStream(shellCommandPath));
+      shellCommand = ds.readUTF();
+    } finally {
+      org.apache.commons.io.IOUtils.closeQuietly(ds);
+      org.apache.commons.io.IOUtils.closeQuietly(fs);
+    }
 
     if (cliParser.hasOption("shell_args")) {
       shellArgs = cliParser.getOptionValue("shell_args");

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1544023&r1=1544022&r2=1544023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Thu Nov 21 03:47:02 2013
@@ -32,14 +32,17 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -131,6 +134,7 @@ public class Client {
 
   // Shell command to be executed 
   private String shellCommand = ""; 
+  private final String shellCommandPath = "shellCommands";
   // Location of shell script 
   private String shellScriptPath = ""; 
   // Args to be passed to the shell command
@@ -483,6 +487,29 @@ public class Client {
       hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
     }
 
+    if (!shellCommand.isEmpty()) {
+      String shellCommandSuffix =
+          appName + "/" + appId.getId() + "/" + shellCommandPath;
+      Path shellCommandDst =
+          new Path(fs.getHomeDirectory(), shellCommandSuffix);
+      FSDataOutputStream ostream = null;
+      try {
+        ostream = FileSystem
+            .create(fs, shellCommandDst, new FsPermission((short) 0710));
+        ostream.writeUTF(shellCommand);
+      } finally {
+        IOUtils.closeQuietly(ostream);
+      }
+      FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
+      LocalResource scRsrc = Records.newRecord(LocalResource.class);
+      scRsrc.setType(LocalResourceType.FILE);
+      scRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+      scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst
+          .toUri()));
+      scRsrc.setTimestamp(scFileStatus.getModificationTime());
+      scRsrc.setSize(scFileStatus.getLen());
+      localResources.put("shellCommands", scRsrc);
+    }
     // Set local resource info into app master container launch context
     amContainer.setLocalResources(localResources);
 
@@ -541,8 +568,9 @@ public class Client {
     vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
     vargs.add("--num_containers " + String.valueOf(numContainers));
     vargs.add("--priority " + String.valueOf(shellCmdPriority));
+
     if (!shellCommand.isEmpty()) {
-      vargs.add("--shell_command " + shellCommand + "");
+      vargs.add("--shell_command " + shellCommandPath + "");
     }
     if (!shellArgs.isEmpty()) {
       vargs.add("--shell_args " + shellArgs + "");

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1544023&r1=1544022&r2=1544023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Thu Nov 21 03:47:02 2013
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.yarn.applications.distributedshell;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -171,6 +174,38 @@ public class TestDistributedShell {
   }
 
   @Test(timeout=90000)
+  public void testDSShellWithCommands() throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "2",
+        "--shell_command",
+        "echo HADOOP YARN MAPREDUCE|wc -w",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1"
+    };
+
+    LOG.info("Initializing DS Client");
+    final Client client =
+        new Client(new Configuration(yarnCluster.getConfig()));
+    boolean initSuccess = client.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+    LOG.info("Client run completed. Result=" + result);
+    List<String> expectedContent = new ArrayList<String>();
+    expectedContent.add("3");
+    verifyContainerLog(2, expectedContent);
+  }
+
+  @Test(timeout=90000)
   public void testDSShellWithInvalidArgs() throws Exception {
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
 
@@ -332,5 +367,56 @@ public class TestDistributedShell {
     LOG.info("Running DS Client");
     Assert.assertTrue(client.run());
   }
+
+  private void
+      verifyContainerLog(int containerNum, List<String> expectedContent) {
+    File logFolder =
+        new File(yarnCluster.getNodeManager(0).getConfig()
+            .get(YarnConfiguration.NM_LOG_DIRS,
+                YarnConfiguration.DEFAULT_NM_LOG_DIRS));
+
+    File[] listOfFiles = logFolder.listFiles();
+    int currentContainerLogFileIndex = -1;
+    for (int i = listOfFiles.length - 1; i >= 0; i--) {
+      if (listOfFiles[i].listFiles().length == containerNum + 1) {
+        currentContainerLogFileIndex = i;
+        break;
+      }
+    }
+    Assert.assertTrue(currentContainerLogFileIndex != -1);
+    File[] containerFiles =
+        listOfFiles[currentContainerLogFileIndex].listFiles();
+
+    for (int i = 0; i < containerFiles.length; i++) {
+      for (File output : containerFiles[i].listFiles()) {
+        if (output.getName().trim().equalsIgnoreCase("stdout")) {
+          BufferedReader br = null;
+          try {
+
+            String sCurrentLine;
+
+            br = new BufferedReader(new FileReader(output));
+            int numOfline = 0;
+            while ((sCurrentLine = br.readLine()) != null) {
+              Assert.assertEquals("The current is" + sCurrentLine,
+                  expectedContent.get(numOfline), sCurrentLine.trim());
+              numOfline++;
+            }
+
+          } catch (IOException e) {
+            e.printStackTrace();
+          } finally {
+            try {
+              if (br != null)
+                br.close();
+            } catch (IOException ex) {
+              ex.printStackTrace();
+            }
+          }
+        }
+      }
+    }
+  }
+
 }