You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/11/20 16:14:07 UTC

git commit: Improve parameters parsing/handling - memory parameters - other JVM parameters

Updated Branches:
  refs/heads/S4-25 15c617326 -> b590862eb


Improve parameters parsing/handling
- memory parameters
- other JVM parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b590862e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b590862e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b590862e

Branch: refs/heads/S4-25
Commit: b590862ebc7e3e77f3365179545ade1f3a958e74
Parents: 15c6173
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Nov 19 18:22:04 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Nov 20 16:31:30 2012 +0100

----------------------------------------------------------------------
 .../apache/s4/tools/yarn/AppMasterYarnArgs.java    |   12 +
 .../org/apache/s4/tools/yarn/CommonS4YarnArgs.java |   60 +++++
 .../apache/s4/tools/yarn/S4ApplicationMaster.java  |  194 +++++----------
 .../org/apache/s4/tools/yarn/S4CLIYarnArgs.java    |   63 +++++
 .../org/apache/s4/tools/yarn/S4YarnClient.java     |   56 +++--
 .../java/org/apache/s4/tools/yarn/YarnArgs.java    |   89 -------
 .../apache/s4/tools/yarn/TestYarnDeployment.java   |    3 +-
 7 files changed, 227 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java
new file mode 100644
index 0000000..a3e17dc
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java
@@ -0,0 +1,12 @@
+package org.apache.s4.tools.yarn;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(separators = "=")
+public class AppMasterYarnArgs extends CommonS4YarnArgs {
+
+    @Parameter(names = "-app_attempt_id", description = "App Attempt ID. Not to be used unless for testing purposes", required = false)
+    String appAttemptId;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java
new file mode 100644
index 0000000..222ac5a
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java
@@ -0,0 +1,60 @@
+package org.apache.s4.tools.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.Main.InlineConfigParameterConverter;
+
+import com.beust.jcommander.Parameter;
+
+public class CommonS4YarnArgs {
+
+    public static final String NAMED_STRING_PARAMETERS = "-namedStringParameters";
+
+    public static final String EXTRA_MODULES_CLASSES = "-extraModulesClasses";
+
+    public static final String PRIORITY = "-priority";
+
+    public static final String S4_NODE_MEMORY = "-s4NodeMemory";
+
+    public static final String S4_NODE_JVM_PARAMETERS = "-s4NodeJVMParameters";
+
+    public static final String NB_S4_NODES = "-nbS4Nodes";
+    public static final String USER = "-user";
+
+    @Parameter(names = { "-cluster", "-c" }, description = "Name of the S4 logical cluster that will contain S4 nodes. NOTE: the cluster is currently defined automatically in ZooKeeper.", required = true)
+    String cluster;
+
+    @Parameter(names = "-zk", description = "S4 Zookeeper cluster manager connection string", required = true)
+    String zkString;
+
+    @Parameter(names = { S4_NODE_MEMORY, "-container_memory" }, description = "YARN parameter: Amount of memory in MB to be requested to host the S4 node", required = false, validateWith = S4CLIYarnArgs.MemoryValidator.class)
+    int containerMemory = 256;
+
+    @Parameter(names = PRIORITY, description = "YARN parameter: Application priority", required = false)
+    int priority = 0;
+
+    @Parameter(names = USER, description = "YARN parameter: User to run the application as", required = false)
+    String user = "";
+
+    @Parameter(names = { NB_S4_NODES, "-num_containers" }, description = "YARN parameter: Number of containers on which the S4 node needs to be hosted (typically: at least as many partitions as the logical cluster)", validateWith = S4CLIYarnArgs.NbContainersValidator.class)
+    int numContainers = 1;
+
+    @Parameter(names = "-debug", description = "YARN parameter: Dump out debug information")
+    boolean debug;
+
+    @Parameter(names = "-test", description = "Test mode")
+    boolean test;
+
+    @Parameter(names = { EXTRA_MODULES_CLASSES, "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
+    List<String> extraModulesClasses = new ArrayList<String>();
+
+    @Parameter(names = { NAMED_STRING_PARAMETERS, "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+    List<String> extraNamedParameters = new ArrayList<String>();
+
+    // TODO parse JVM parameters that include commas
+    @Parameter(names = S4CLIYarnArgs.S4_NODE_JVM_PARAMETERS, description = "Extra JVM parameter for running the nodes, specified as a comma separated list. The memory is usually configured through "
+            + S4_NODE_MEMORY, required = false)
+    List<String> extraS4NodeJVMParams = new ArrayList<String>();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
index 7f025dd..c3e847f 100644
--- a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
@@ -30,12 +30,6 @@ import java.util.Vector;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -72,10 +66,12 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.s4.deploy.HdfsFetcherModule;
+import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 
 /**
  * An ApplicationMaster for launching S4 nodes on a set of launched containers using the YARN framework.
@@ -109,14 +105,6 @@ public class S4ApplicationMaster {
     // Tracking url to which app master publishes info for clients to monitor
     private String appMasterTrackingUrl = "";
 
-    // App Master configuration
-    // No. of containers to host S4 nodes on
-    private int numTotalContainers = 1;
-    // Memory to request for the container on which the S4 nodes will be hosted
-    private int containerMemory = 10;
-    // Priority of the request
-    private int requestPriority;
-
     // Incremental counter for rpc calls to the RM
     private AtomicInteger rmRequestID = new AtomicInteger();
 
@@ -134,24 +122,15 @@ public class S4ApplicationMaster {
     // Only request for more if the original requirement changes.
     private AtomicInteger numRequestedContainers = new AtomicInteger();
 
-    // The cluster (or application) name
-    private String cluster = "";
-
-    private String zkString = "";
-
     // Containers to be released
     private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
 
     // Launch threads
     private List<Thread> launchThreads = new ArrayList<Thread>();
 
-    private boolean testMode;
-
-    private String user;
-
-    private String extraModulesClasses;
+    private int containerMemory;
 
-    private String namedStringParameters;
+    private static AppMasterYarnArgs yarnArgs;
 
     /**
      * @param args
@@ -160,13 +139,16 @@ public class S4ApplicationMaster {
     public static void main(String[] args) {
         boolean result = false;
         try {
+            yarnArgs = new AppMasterYarnArgs();
+            logger.info("S4YarnApplicationMaster args = " + Arrays.toString(args));
+
+            Tools.parseArgs(yarnArgs, args);
+
+            Thread.sleep(10000);
 
             S4ApplicationMaster appMaster = new S4ApplicationMaster();
             logger.info("Initializing ApplicationMaster with args " + Arrays.toString(args));
-            boolean doRun = appMaster.init(args);
-            if (!doRun) {
-                System.exit(0);
-            }
+            appMaster.init();
             result = appMaster.run();
         } catch (Throwable t) {
             t.printStackTrace();
@@ -195,48 +177,15 @@ public class S4ApplicationMaster {
     }
 
     /**
-     * Parse command line options
-     * 
-     * @param args
-     *            Command line args
-     * @return Whether init successful and run should be invoked
-     * @throws ParseException
-     * @throws IOException
      */
-    public boolean init(String[] args) throws ParseException, IOException {
-
-        Options opts = new Options();
-        opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
-        opts.addOption("c", "cluster", true, "The cluster (or application) name");
-        opts.addOption("zk", true, "Zookeeper connection string");
-        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to host the S4 node");
-        opts.addOption("num_containers", true, "No. of containers on which the S4 node needs to be hosted");
-        opts.addOption("priority", true, "Application Priority. Default 0");
-        opts.addOption("debug", false, "Dump out debug information");
-        opts.addOption("test", false, "Test mode");
-        opts.addOption("extraModulesClasses", true, "Extra modules classes for S4 node configuration");
-        opts.addOption("namedStringParameters", true, "Named configuration parameters for S4 node configuration");
-
-        opts.addOption("help", false, "Print usage");
-        CommandLine cliParser = new GnuParser().parse(opts, args);
-
-        if (args.length == 0) {
-            printUsage(opts);
-            throw new IllegalArgumentException("No args specified for application master to initialize");
-        }
-
-        if (cliParser.hasOption("help")) {
-            printUsage(opts);
-            return false;
-        }
+    public void init() throws IOException {
 
         Map<String, String> envs = System.getenv();
 
         appAttemptID = Records.newRecord(ApplicationAttemptId.class);
         if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
-            if (cliParser.hasOption("app_attempt_id")) {
-                String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
-                appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+            if (!Strings.isNullOrEmpty(yarnArgs.appAttemptId)) {
+                appAttemptID = ConverterUtils.toApplicationAttemptId(yarnArgs.appAttemptId);
             } else {
                 throw new IllegalArgumentException("Application Attempt Id not set in the environment");
             }
@@ -248,49 +197,17 @@ public class S4ApplicationMaster {
         logger.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
                 + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
                 + appAttemptID.getAttemptId());
-        logger.info("Application master args = " + Arrays.toString(cliParser.getArgs()));
-        for (Option option : cliParser.getOptions()) {
-            logger.info(option.getArgName() + " / " + option.getDescription() + " / " + option.getOpt() + " / "
-                    + option.getValue());
-        }
-        logger.info("Application master args = " + Arrays.toString(cliParser.getOptions()));
 
-        if (!cliParser.hasOption("cluster")) {
+        if (Strings.isNullOrEmpty(yarnArgs.cluster)) {
             throw new IllegalArgumentException("No cluster ID specified to be provisioned by application master");
         }
-        cluster = cliParser.getOptionValue("cluster");
-        zkString = cliParser.getOptionValue("zk");
-
-        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "128"));
-        numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
-        requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
-        user = cliParser.getOptionValue("user");
-
-        if (cliParser.hasOption("extraModulesClasses")) {
-            extraModulesClasses = cliParser.getOptionValue("extraModulesClasses");
-        }
-        if (cliParser.hasOption("namedStringParameters")) {
-            namedStringParameters = cliParser.getOptionValue("namedStringParameters");
-        }
 
         conf = new YarnConfiguration();
-        if (cliParser.hasOption("test")) {
-            testMode = true;
+        if (yarnArgs.test) {
+            // testMode = true;
             conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
         }
         rpc = YarnRPC.create(conf);
-
-        return true;
-    }
-
-    /**
-     * Helper function to print usage
-     * 
-     * @param opts
-     *            Parsed command line options
-     */
-    private void printUsage(Options opts) {
-        new HelpFormatter().printHelp("ApplicationMaster", opts);
     }
 
     /**
@@ -320,23 +237,23 @@ public class S4ApplicationMaster {
         // a multiple of the min value and cannot exceed the max.
         // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
         if (containerMemory < minMem) {
-            logger.info("Container memory specified below min threshold of cluster. Using min value." + ", specified="
-                    + containerMemory + ", min=" + minMem);
+            logger.info("Container memory for S4 node specified below min threshold of YARN cluster. Using min value."
+                    + ", specified=" + containerMemory + ", min=" + minMem);
             containerMemory = minMem;
         } else if (containerMemory > maxMem) {
-            logger.info("Container memory specified above max threshold of cluster. Using max value." + ", specified="
-                    + containerMemory + ", max=" + maxMem);
+            logger.info("Container memory for S4 node specified above max threshold of YARN cluster. Using max value."
+                    + ", specified=" + containerMemory + ", max=" + maxMem);
             containerMemory = maxMem;
         }
 
         int loopCounter = -1;
 
-        while (numCompletedContainers.get() < numTotalContainers && !appDone) {
+        while (numCompletedContainers.get() < yarnArgs.numContainers && !appDone) {
             loopCounter++;
 
             // log current state
             logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
-                    + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+                    + yarnArgs.numContainers + ", requested=" + numRequestedContainers + ", completed="
                     + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
                     + numAllocatedContainers);
 
@@ -354,7 +271,7 @@ public class S4ApplicationMaster {
             // For the first loop, askCount will be equal to total containers needed
             // From that point on, askCount will always be 0 as current implementation
             // does not change its ask on container failures.
-            int askCount = numTotalContainers - numRequestedContainers.get();
+            int askCount = yarnArgs.numContainers - numRequestedContainers.get();
             numRequestedContainers.addAndGet(askCount);
 
             // Setup request to be sent to RM to allocate containers
@@ -432,12 +349,12 @@ public class S4ApplicationMaster {
                 }
 
             }
-            if (numCompletedContainers.get() == numTotalContainers) {
+            if (numCompletedContainers.get() == yarnArgs.numContainers) {
                 appDone = true;
             }
 
             logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
-                    + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+                    + yarnArgs.numContainers + ", requested=" + numRequestedContainers + ", completed="
                     + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
                     + numAllocatedContainers);
 
@@ -466,7 +383,7 @@ public class S4ApplicationMaster {
             finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
         } else {
             finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
-            String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+            String diagnostics = "Diagnostics." + ", total=" + yarnArgs.numContainers + ", completed="
                     + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
                     + numFailedContainers.get();
             finishReq.setDiagnostics(diagnostics);
@@ -512,6 +429,13 @@ public class S4ApplicationMaster {
          * start request to the CM.
          */
         public void run() {
+
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e2) {
+                // TODO Auto-generated catch block
+                e2.printStackTrace();
+            }
             // Connect to ContainerManager
             connectToCM();
 
@@ -522,8 +446,8 @@ public class S4ApplicationMaster {
             ctx.setResource(container.getResource());
 
             try {
-                if (!Strings.isNullOrEmpty(user)) {
-                    ctx.setUser(user);
+                if (!Strings.isNullOrEmpty(yarnArgs.user)) {
+                    ctx.setUser(yarnArgs.user);
                 } else {
                     logger.info("Using default user name {}", UserGroupInformation.getCurrentUser().getShortUserName());
                     ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
@@ -576,42 +500,42 @@ public class S4ApplicationMaster {
             Vector<CharSequence> vargs = new Vector<CharSequence>(5);
 
             vargs.add("java");
+
+            vargs.add("-Xmx" + containerMemory + "m");
+
+            S4YarnClient.addListElementsToCommandLineBuffer(vargs, null, " ", yarnArgs.extraS4NodeJVMParams);
+
+            if (!yarnArgs.extraS4NodeJVMParams.isEmpty()) {
+                vargs.add(CommonS4YarnArgs.S4_NODE_JVM_PARAMETERS);
+            }
             // TODO add memory parameter
             // vargs.add("-Xdebug");
             // vargs.add("-Xrunjdwp:transport=dt_socket,address=8889,server=y");
             vargs.add("org.apache.s4.core.Main");
-            vargs.add("-zk=" + zkString);
-            vargs.add("-c=" + cluster);
+            vargs.add("-zk=" + yarnArgs.zkString);
+            vargs.add("-c=" + yarnArgs.cluster);
 
-            if (Strings.isNullOrEmpty(extraModulesClasses)) {
-                extraModulesClasses = HdfsFetcherModule.class.getName();
-            } else {
-                extraModulesClasses += "," + HdfsFetcherModule.class.getName();
-            }
+            List<String> extraModulesClasses = Lists.newArrayList(yarnArgs.extraModulesClasses);
             // add module for fetchings from hdfs
-            vargs.add("-extraModulesClasses=" + extraModulesClasses);
+            extraModulesClasses.add(HdfsFetcherModule.class.getName());
+            S4YarnClient.addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.EXTRA_MODULES_CLASSES, ",",
+                    extraModulesClasses);
 
             // add reference to the configuration
-            if (testMode) {
-                if (Strings.isNullOrEmpty(namedStringParameters)) {
-                    namedStringParameters = FileSystem.FS_DEFAULT_NAME_KEY + "="
-                            + conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
-                } else {
-                    namedStringParameters += "," + FileSystem.FS_DEFAULT_NAME_KEY + "="
-                            + conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
-                }
-            }
-            if (!Strings.isNullOrEmpty(namedStringParameters)) {
-                vargs.add("-namedStringParameters=" + namedStringParameters);
+            List<String> namedStringParams = Lists.newArrayList(yarnArgs.extraNamedParameters);
+            if (yarnArgs.test) {
+                namedStringParams.add(FileSystem.FS_DEFAULT_NAME_KEY + "=" + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
             }
+            S4YarnClient.addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.NAMED_STRING_PARAMETERS, ",",
+                    namedStringParams);
 
             // TODO
             // We should redirect the output to hdfs instead of local logs
             // so as to be able to look at the final output after the containers
             // have been released.
             // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err]
-            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/s4-node-stdout");
+            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/s4-node-stderr");
 
             // Get final commmand
             StringBuilder command = new StringBuilder();
@@ -693,7 +617,7 @@ public class S4ApplicationMaster {
         // set the priority for the request
         Priority pri = Records.newRecord(Priority.class);
         // TODO - what is the range for priority? how to decide?
-        pri.setPriority(requestPriority);
+        pri.setPriority(yarnArgs.priority);
         request.setPriority(pri);
 
         // Set up resource type requirements
@@ -719,7 +643,7 @@ public class S4ApplicationMaster {
         req.setApplicationAttemptId(appAttemptID);
         req.addAllAsks(requestedContainers);
         req.addAllReleases(releasedContainers);
-        req.setProgress((float) numCompletedContainers.get() / numTotalContainers);
+        req.setProgress((float) numCompletedContainers.get() / yarnArgs.numContainers);
 
         logger.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size()
                 + ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java
new file mode 100644
index 0000000..2203fc0
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java
@@ -0,0 +1,63 @@
+package org.apache.s4.tools.yarn;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+
+@Parameters(separators = "=")
+class S4CLIYarnArgs extends CommonS4YarnArgs {
+
+    public static final String S4_YARN_MASTER_MEMORY = "-s4YarnMasterMemory";
+
+    public static final String QUEUE = "-queue";
+
+    public static final String S4_DIR = "-s4Dir";
+
+    @Parameter(names = "-appName", description = "Name of the application", required = false)
+    String appName = "S4App";
+
+    @Parameter(names = { "-nbTasks", "-nbPartitions" }, description = "Number of partitions of the S4 cluster", required = true)
+    int nbTasks;
+
+    @Parameter(names = { "-flp", "-firstListeningPort" }, description = "First listening port for S4 nodes", required = true)
+    int flp = 12000;
+
+    @Parameter(names = "-s4r", description = "URI to S4 archive (.s4r) file. It will be automatically deployed on the allocated S4 nodes. Examples: file:///home/s4/file.s4r or more probably hdfs:///hostname:port/path/file.s4r", required = true)
+    String s4rPath;
+
+    @Parameter(names = S4_DIR, description = "S4 directory. It is used to resolve S4 platform libraries and dependencies, currently from s4dir/subprojects/s4-yarn/build/install/s4-yarn/lib", required = true, hidden = true)
+    String s4Dir;
+
+    @Parameter(names = QUEUE, description = "YARN parameter:  RM Queue in which this application is to be submitted", required = false)
+    String queue = "default";
+
+    @Parameter(names = "-timeout", description = "YARN parameter: Application timeout in milliseconds (default is: -1 = no timeout)", required = false)
+    int timeout = -1;
+
+    @Parameter(names = { "-master_memory", S4_YARN_MASTER_MEMORY }, description = "YARN parameter: Amount of memory in MB to be requested to run the application master", required = false, validateWith = S4CLIYarnArgs.MemoryValidator.class)
+    int masterMemory = 256;
+
+    @Parameter(names = "-log_properties", description = "YARN parameter: log4j.properties file", required = false)
+    String logProperties = "";
+
+    public static class MemoryValidator implements IParameterValidator {
+
+        @Override
+        public void validate(String name, String value) throws ParameterException {
+            if (Integer.valueOf(value).intValue() < 0) {
+                throw new ParameterException("Invalid memory size: " + value);
+            }
+        }
+    }
+
+    public static class NbContainersValidator implements IParameterValidator {
+        @Override
+        public void validate(String name, String value) throws ParameterException {
+            if (Integer.valueOf(value).intValue() < 1) {
+                throw new ParameterException("Invalid number of containers: " + value);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
index 8eb5cce..a4b5a12 100644
--- a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
@@ -122,7 +122,7 @@ public class S4YarnClient extends YarnClientImpl {
     // Configuration
     private Configuration conf;
 
-    YarnArgs yarnArgs;
+    S4CLIYarnArgs yarnArgs;
 
     private int amMemory;
 
@@ -134,7 +134,7 @@ public class S4YarnClient extends YarnClientImpl {
      */
     public static void main(String[] args) {
 
-        YarnArgs yarnArgs = new YarnArgs();
+        S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();
         logger.info("S4YarnClient args = " + Arrays.toString(args));
 
         Tools.parseArgs(yarnArgs, args);
@@ -155,7 +155,8 @@ public class S4YarnClient extends YarnClientImpl {
                     return YARN_CONF_FILES.contains(pathname.getName());
                 }
             }).length == 4)) {
-                logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml");
+                logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml",
+                        HADOOP_CONF_DIR_ENV);
                 System.exit(1);
             }
 
@@ -177,7 +178,7 @@ public class S4YarnClient extends YarnClientImpl {
         System.exit(1);
     }
 
-    public S4YarnClient(YarnArgs yarnArgs, Configuration conf) throws Exception {
+    public S4YarnClient(S4CLIYarnArgs yarnArgs, Configuration conf) throws Exception {
         this.yarnArgs = yarnArgs;
         this.conf = conf;
         init(this.conf);
@@ -235,6 +236,9 @@ public class S4YarnClient extends YarnClientImpl {
         // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
         // a multiple of the min value and cannot exceed the max.
         // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+
+        amMemory = yarnArgs.masterMemory;
+
         if (amMemory < minMem) {
             logger.info("AM memory specified below min threshold of cluster. Using min value." + ", specified="
                     + amMemory + ", min=" + minMem);
@@ -308,6 +312,7 @@ public class S4YarnClient extends YarnClientImpl {
         logger.info("Setting up app master command");
 
         // vargs.add("${JAVA_HOME}" + "/bin/java");
+        // TODO set java from JAVA_HOME
         vargs.add("java");
         // Set Xmx based on am memory size
         vargs.add("-Xmx" + amMemory + "m");
@@ -316,27 +321,13 @@ public class S4YarnClient extends YarnClientImpl {
         // Set Application Master class name
         vargs.add(S4ApplicationMaster.class.getName());
         // Set params for Application Master
-        vargs.add("--container_memory " + String.valueOf(yarnArgs.containerMemory));
-        vargs.add("--num_containers " + String.valueOf(yarnArgs.numContainers));
-        vargs.add("--priority " + String.valueOf(yarnArgs.priority));
-        if (!yarnArgs.extraModulesClasses.isEmpty()) {
-            StringBuilder sb = new StringBuilder();
-            sb.append("-extraModulesClasses ");
-            ListIterator<String> it = yarnArgs.extraModulesClasses.listIterator();
-            while (it.hasNext()) {
-                sb.append(it.next() + (it.hasNext() ? "," : ""));
-            }
-            vargs.add(sb.toString());
-        }
-        if (!yarnArgs.extraNamedParameters.isEmpty()) {
-            StringBuilder sb = new StringBuilder();
-            sb.append("-namedStringParameters ");
-            ListIterator<String> it = yarnArgs.extraNamedParameters.listIterator();
-            while (it.hasNext()) {
-                sb.append(it.next() + (it.hasNext() ? "," : ""));
-            }
-            vargs.add(sb.toString());
-        }
+        vargs.add(CommonS4YarnArgs.S4_NODE_MEMORY + " " + String.valueOf(yarnArgs.containerMemory));
+        vargs.add(CommonS4YarnArgs.NB_S4_NODES + " " + String.valueOf(yarnArgs.numContainers));
+        vargs.add(CommonS4YarnArgs.PRIORITY + " " + String.valueOf(yarnArgs.priority));
+
+        addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.S4_NODE_JVM_PARAMETERS, ",", yarnArgs.extraS4NodeJVMParams);
+        addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.EXTRA_MODULES_CLASSES, ",", yarnArgs.extraModulesClasses);
+        addListElementsToCommandLineBuffer(vargs, "-namedStringParameters", ",", yarnArgs.extraNamedParameters);
 
         vargs.add("-c " + String.valueOf(yarnArgs.cluster));
         vargs.add("-zk " + String.valueOf(yarnArgs.zkString));
@@ -405,6 +396,21 @@ public class S4YarnClient extends YarnClientImpl {
 
     }
 
+    public static void addListElementsToCommandLineBuffer(Vector<CharSequence> vargs, String paramName,
+            String paramSeparator, List<String> yarnArg) {
+        if (yarnArg != null && !yarnArg.isEmpty()) {
+            StringBuilder sb = new StringBuilder();
+            if (!Strings.isNullOrEmpty(paramName)) {
+                sb.append(paramName + "=");
+            }
+            ListIterator<String> it = yarnArg.listIterator();
+            while (it.hasNext()) {
+                sb.append(it.next() + (it.hasNext() ? paramSeparator : ""));
+            }
+            vargs.add(sb.toString());
+        }
+    }
+
     private Path copyToLocalResources(ApplicationId appId, FileSystem fs, Map<String, LocalResource> localResources,
             File file) throws IOException {
         Path src = new Path(file.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java
deleted file mode 100644
index ac568b3..0000000
--- a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.s4.tools.yarn;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.core.Main.InlineConfigParameterConverter;
-
-import com.beust.jcommander.IParameterValidator;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-
-@Parameters(commandNames = "yarn", separators = "=", commandDescription = "Deploy S4 application on YARN")
-class YarnArgs {
-
-    @Parameter(names = "-appName", description = "Name of the application", required = false)
-    String appName = "S4App";
-
-    @Parameter(names = "-priority", description = "Application priority", required = false)
-    int priority = 0;
-
-    @Parameter(names = "-queue", description = "RM Queue in which this application is to be submitted", required = false)
-    String queue = "default";
-
-    @Parameter(names = "-user", description = "User to run the application as", required = false)
-    String user = "";
-
-    @Parameter(names = "-timeout", description = "Application timeout in milliseconds (default is: -1 = no timeout)", required = false)
-    int timeout = -1;
-
-    @Parameter(names = "-s4Dir", description = "S4 directory. It is used to resolve S4 platform libraries and dependencies, currently from s4dir/subprojects/s4-yarn/build/install/s4-yarn/lib", required = true, hidden = true)
-    String s4Dir;
-
-    public static class MemoryValidator implements IParameterValidator {
-
-        @Override
-        public void validate(String name, String value) throws ParameterException {
-            if (Integer.valueOf(value).intValue() < 0) {
-                throw new ParameterException("Invalid memory size: " + value);
-            }
-        }
-    }
-
-    @Parameter(names = "-master_memory", description = "Amount of memory in MB to be requested to run the application master", required = false, validateWith = YarnArgs.MemoryValidator.class)
-    int masterMemory = 256;
-
-    @Parameter(names = { "-cluster", "-c" }, description = "Name of the S4 logical cluster that will contain S4 nodes. NOTE: the cluster is currently defined automatically in ZooKeeper.", required = true)
-    String cluster;
-
-    @Parameter(names = { "-nbTasks", "-nbPartitions" }, description = "Number of partitions of the S4 cluster", required = true)
-    int nbTasks;
-
-    @Parameter(names = { "-flp", "-firstListeningPort" }, description = "First listening port for S4 nodes", required = true)
-    int flp = 12000;
-
-    @Parameter(names = "-s4r", description = "URI to S4 archive (.s4r) file. It will be automatically deployed on the allocated S4 nodes. Examples: file:///home/s4/file.s4r or more probably hdfs:///hostname:port/path/file.s4r", required = true)
-    String s4rPath;
-
-    @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
-    List<String> extraModulesClasses = new ArrayList<String>();
-
-    @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
-    List<String> extraNamedParameters = new ArrayList<String>();
-
-    @Parameter(names = "-container_memory", description = "Amount of memory in MB to be requested to host the S4 node", required = false, validateWith = YarnArgs.MemoryValidator.class)
-    int containerMemory = 10;
-
-    public static class NbContainersValidator implements IParameterValidator {
-        @Override
-        public void validate(String name, String value) throws ParameterException {
-            if (Integer.valueOf(value).intValue() < 1) {
-                throw new ParameterException("Invalid number of containers: " + value);
-            }
-        }
-    }
-
-    @Parameter(names = "-num_containers", description = "Number of containers on which the S4 node needs to be hosted (typically: at least as many partitions as the logical cluster)", validateWith = YarnArgs.NbContainersValidator.class)
-    int numContainers = 1;
-
-    @Parameter(names = "-zk", description = "S4 Zookeeper cluster manager connection string", required = true)
-    String zkString;
-
-    @Parameter(names = "-log_properties", description = "log4j.properties file", required = false)
-    String logProperties = "";
-
-    @Parameter(names = "-debug", description = "Dump out debug information")
-    boolean debug;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java b/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
index 639c566..771c929 100644
--- a/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
+++ b/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.io.Files;
 import com.google.inject.Injector;
 
+// NOTE: you must have an updated s4-yarn installed app (this is automatic when tests are run from the command line, otherwise do: gradlew s4-yarn:installApp)
 public class TestYarnDeployment extends ZkBasedTest {
 
     private static Logger logger = LoggerFactory.getLogger(TestYarnDeployment.class);
@@ -132,7 +133,7 @@ public class TestYarnDeployment extends ZkBasedTest {
         final String[] params = ("-cluster=cluster1 -nbTasks=2 -flp=14000 -s4r=" + destS4rPath.toUri().toString()
                 + " -zk=localhost:2181 -s4Dir=" + gradlewFile.getParentFile().getAbsolutePath()).split("[ ]");
 
-        YarnArgs yarnArgs = new YarnArgs();
+        S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();
 
         Tools.parseArgs(yarnArgs, params);
         final S4YarnClient client = new S4YarnClient(yarnArgs, conf);