You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/11/20 16:14:07 UTC
git commit: Improve parameters parsing/handling - memory parameters -
other JVM parameters
Updated Branches:
refs/heads/S4-25 15c617326 -> b590862eb
Improve parameters parsing/handling
- memory parameters
- other JVM parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b590862e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b590862e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b590862e
Branch: refs/heads/S4-25
Commit: b590862ebc7e3e77f3365179545ade1f3a958e74
Parents: 15c6173
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Nov 19 18:22:04 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Nov 20 16:31:30 2012 +0100
----------------------------------------------------------------------
.../apache/s4/tools/yarn/AppMasterYarnArgs.java | 12 +
.../org/apache/s4/tools/yarn/CommonS4YarnArgs.java | 60 +++++
.../apache/s4/tools/yarn/S4ApplicationMaster.java | 194 +++++----------
.../org/apache/s4/tools/yarn/S4CLIYarnArgs.java | 63 +++++
.../org/apache/s4/tools/yarn/S4YarnClient.java | 56 +++--
.../java/org/apache/s4/tools/yarn/YarnArgs.java | 89 -------
.../apache/s4/tools/yarn/TestYarnDeployment.java | 3 +-
7 files changed, 227 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java
new file mode 100644
index 0000000..a3e17dc
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/AppMasterYarnArgs.java
@@ -0,0 +1,12 @@
+package org.apache.s4.tools.yarn;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(separators = "=")
+public class AppMasterYarnArgs extends CommonS4YarnArgs {
+
+ @Parameter(names = "-app_attempt_id", description = "App Attempt ID. Not to be used unless for testing purposes", required = false)
+ String appAttemptId;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java
new file mode 100644
index 0000000..222ac5a
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/CommonS4YarnArgs.java
@@ -0,0 +1,60 @@
+package org.apache.s4.tools.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.Main.InlineConfigParameterConverter;
+
+import com.beust.jcommander.Parameter;
+
+public class CommonS4YarnArgs {
+
+ public static final String NAMED_STRING_PARAMETERS = "-namedStringParameters";
+
+ public static final String EXTRA_MODULES_CLASSES = "-extraModulesClasses";
+
+ public static final String PRIORITY = "-priority";
+
+ public static final String S4_NODE_MEMORY = "-s4NodeMemory";
+
+ public static final String S4_NODE_JVM_PARAMETERS = "-s4NodeJVMParameters";
+
+ public static final String NB_S4_NODES = "-nbS4Nodes";
+ public static final String USER = "-user";
+
+ @Parameter(names = { "-cluster", "-c" }, description = "Name of the S4 logical cluster that will contain S4 nodes. NOTE: the cluster is currently defined automatically in ZooKeeper.", required = true)
+ String cluster;
+
+ @Parameter(names = "-zk", description = "S4 Zookeeper cluster manager connection string", required = true)
+ String zkString;
+
+ @Parameter(names = { S4_NODE_MEMORY, "-container_memory" }, description = "YARN parameter: Amount of memory in MB to be requested to host the S4 node", required = false, validateWith = S4CLIYarnArgs.MemoryValidator.class)
+ int containerMemory = 256;
+
+ @Parameter(names = PRIORITY, description = "YARN parameter: Application priority", required = false)
+ int priority = 0;
+
+ @Parameter(names = USER, description = "YARN parameter: User to run the application as", required = false)
+ String user = "";
+
+ @Parameter(names = { NB_S4_NODES, "-num_containers" }, description = "YARN parameter: Number of containers on which the S4 node needs to be hosted (typically: at least as many partitions as the logical cluster)", validateWith = S4CLIYarnArgs.NbContainersValidator.class)
+ int numContainers = 1;
+
+ @Parameter(names = "-debug", description = "YARN parameter: Dump out debug information")
+ boolean debug;
+
+ @Parameter(names = "-test", description = "Test mode")
+ boolean test;
+
+ @Parameter(names = { EXTRA_MODULES_CLASSES, "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
+ List<String> extraModulesClasses = new ArrayList<String>();
+
+ @Parameter(names = { NAMED_STRING_PARAMETERS, "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+ List<String> extraNamedParameters = new ArrayList<String>();
+
+ // TODO parse JVM parameters that include commas
+ @Parameter(names = S4CLIYarnArgs.S4_NODE_JVM_PARAMETERS, description = "Extra JVM parameter for running the nodes, specified as a comma separated list. The memory is usually configured through "
+ + S4_NODE_MEMORY, required = false)
+ List<String> extraS4NodeJVMParams = new ArrayList<String>();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
index 7f025dd..c3e847f 100644
--- a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
@@ -30,12 +30,6 @@ import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -72,10 +66,12 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.s4.deploy.HdfsFetcherModule;
+import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
/**
* An ApplicationMaster for launching S4 nodes on a set of launched containers using the YARN framework.
@@ -109,14 +105,6 @@ public class S4ApplicationMaster {
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
- // App Master configuration
- // No. of containers to host S4 nodes on
- private int numTotalContainers = 1;
- // Memory to request for the container on which the S4 nodes will be hosted
- private int containerMemory = 10;
- // Priority of the request
- private int requestPriority;
-
// Incremental counter for rpc calls to the RM
private AtomicInteger rmRequestID = new AtomicInteger();
@@ -134,24 +122,15 @@ public class S4ApplicationMaster {
// Only request for more if the original requirement changes.
private AtomicInteger numRequestedContainers = new AtomicInteger();
- // The cluster (or application) name
- private String cluster = "";
-
- private String zkString = "";
-
// Containers to be released
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
- private boolean testMode;
-
- private String user;
-
- private String extraModulesClasses;
+ private int containerMemory;
- private String namedStringParameters;
+ private static AppMasterYarnArgs yarnArgs;
/**
* @param args
@@ -160,13 +139,16 @@ public class S4ApplicationMaster {
public static void main(String[] args) {
boolean result = false;
try {
+ yarnArgs = new AppMasterYarnArgs();
+ logger.info("S4YarnApplicationMaster args = " + Arrays.toString(args));
+
+ Tools.parseArgs(yarnArgs, args);
+
+ Thread.sleep(10000);
S4ApplicationMaster appMaster = new S4ApplicationMaster();
logger.info("Initializing ApplicationMaster with args " + Arrays.toString(args));
- boolean doRun = appMaster.init(args);
- if (!doRun) {
- System.exit(0);
- }
+ appMaster.init();
result = appMaster.run();
} catch (Throwable t) {
t.printStackTrace();
@@ -195,48 +177,15 @@ public class S4ApplicationMaster {
}
/**
- * Parse command line options
- *
- * @param args
- * Command line args
- * @return Whether init successful and run should be invoked
- * @throws ParseException
- * @throws IOException
*/
- public boolean init(String[] args) throws ParseException, IOException {
-
- Options opts = new Options();
- opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
- opts.addOption("c", "cluster", true, "The cluster (or application) name");
- opts.addOption("zk", true, "Zookeeper connection string");
- opts.addOption("container_memory", true, "Amount of memory in MB to be requested to host the S4 node");
- opts.addOption("num_containers", true, "No. of containers on which the S4 node needs to be hosted");
- opts.addOption("priority", true, "Application Priority. Default 0");
- opts.addOption("debug", false, "Dump out debug information");
- opts.addOption("test", false, "Test mode");
- opts.addOption("extraModulesClasses", true, "Extra modules classes for S4 node configuration");
- opts.addOption("namedStringParameters", true, "Named configuration parameters for S4 node configuration");
-
- opts.addOption("help", false, "Print usage");
- CommandLine cliParser = new GnuParser().parse(opts, args);
-
- if (args.length == 0) {
- printUsage(opts);
- throw new IllegalArgumentException("No args specified for application master to initialize");
- }
-
- if (cliParser.hasOption("help")) {
- printUsage(opts);
- return false;
- }
+ public void init() throws IOException {
Map<String, String> envs = System.getenv();
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
- if (cliParser.hasOption("app_attempt_id")) {
- String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
- appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+ if (!Strings.isNullOrEmpty(yarnArgs.appAttemptId)) {
+ appAttemptID = ConverterUtils.toApplicationAttemptId(yarnArgs.appAttemptId);
} else {
throw new IllegalArgumentException("Application Attempt Id not set in the environment");
}
@@ -248,49 +197,17 @@ public class S4ApplicationMaster {
logger.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+ ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
+ appAttemptID.getAttemptId());
- logger.info("Application master args = " + Arrays.toString(cliParser.getArgs()));
- for (Option option : cliParser.getOptions()) {
- logger.info(option.getArgName() + " / " + option.getDescription() + " / " + option.getOpt() + " / "
- + option.getValue());
- }
- logger.info("Application master args = " + Arrays.toString(cliParser.getOptions()));
- if (!cliParser.hasOption("cluster")) {
+ if (Strings.isNullOrEmpty(yarnArgs.cluster)) {
throw new IllegalArgumentException("No cluster ID specified to be provisioned by application master");
}
- cluster = cliParser.getOptionValue("cluster");
- zkString = cliParser.getOptionValue("zk");
-
- containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "128"));
- numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
- requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
- user = cliParser.getOptionValue("user");
-
- if (cliParser.hasOption("extraModulesClasses")) {
- extraModulesClasses = cliParser.getOptionValue("extraModulesClasses");
- }
- if (cliParser.hasOption("namedStringParameters")) {
- namedStringParameters = cliParser.getOptionValue("namedStringParameters");
- }
conf = new YarnConfiguration();
- if (cliParser.hasOption("test")) {
- testMode = true;
+ if (yarnArgs.test) {
+ // testMode = true;
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
}
rpc = YarnRPC.create(conf);
-
- return true;
- }
-
- /**
- * Helper function to print usage
- *
- * @param opts
- * Parsed command line options
- */
- private void printUsage(Options opts) {
- new HelpFormatter().printHelp("ApplicationMaster", opts);
}
/**
@@ -320,23 +237,23 @@ public class S4ApplicationMaster {
// a multiple of the min value and cannot exceed the max.
// If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
if (containerMemory < minMem) {
- logger.info("Container memory specified below min threshold of cluster. Using min value." + ", specified="
- + containerMemory + ", min=" + minMem);
+ logger.info("Container memory for S4 node specified below min threshold of YARN cluster. Using min value."
+ + ", specified=" + containerMemory + ", min=" + minMem);
containerMemory = minMem;
} else if (containerMemory > maxMem) {
- logger.info("Container memory specified above max threshold of cluster. Using max value." + ", specified="
- + containerMemory + ", max=" + maxMem);
+ logger.info("Container memory for S4 node specified above max threshold of YARN cluster. Using max value."
+ + ", specified=" + containerMemory + ", max=" + maxMem);
containerMemory = maxMem;
}
int loopCounter = -1;
- while (numCompletedContainers.get() < numTotalContainers && !appDone) {
+ while (numCompletedContainers.get() < yarnArgs.numContainers && !appDone) {
loopCounter++;
// log current state
logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
- + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+ + yarnArgs.numContainers + ", requested=" + numRequestedContainers + ", completed="
+ numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
+ numAllocatedContainers);
@@ -354,7 +271,7 @@ public class S4ApplicationMaster {
// For the first loop, askCount will be equal to total containers needed
// From that point on, askCount will always be 0 as current implementation
// does not change its ask on container failures.
- int askCount = numTotalContainers - numRequestedContainers.get();
+ int askCount = yarnArgs.numContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
// Setup request to be sent to RM to allocate containers
@@ -432,12 +349,12 @@ public class S4ApplicationMaster {
}
}
- if (numCompletedContainers.get() == numTotalContainers) {
+ if (numCompletedContainers.get() == yarnArgs.numContainers) {
appDone = true;
}
logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
- + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+ + yarnArgs.numContainers + ", requested=" + numRequestedContainers + ", completed="
+ numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
+ numAllocatedContainers);
@@ -466,7 +383,7 @@ public class S4ApplicationMaster {
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
} else {
finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
- String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+ String diagnostics = "Diagnostics." + ", total=" + yarnArgs.numContainers + ", completed="
+ numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
finishReq.setDiagnostics(diagnostics);
@@ -512,6 +429,13 @@ public class S4ApplicationMaster {
* start request to the CM.
*/
public void run() {
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e2) {
+ // TODO Auto-generated catch block
+ e2.printStackTrace();
+ }
// Connect to ContainerManager
connectToCM();
@@ -522,8 +446,8 @@ public class S4ApplicationMaster {
ctx.setResource(container.getResource());
try {
- if (!Strings.isNullOrEmpty(user)) {
- ctx.setUser(user);
+ if (!Strings.isNullOrEmpty(yarnArgs.user)) {
+ ctx.setUser(yarnArgs.user);
} else {
logger.info("Using default user name {}", UserGroupInformation.getCurrentUser().getShortUserName());
ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
@@ -576,42 +500,42 @@ public class S4ApplicationMaster {
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
vargs.add("java");
+
+ vargs.add("-Xmx" + containerMemory + "m");
+
+ S4YarnClient.addListElementsToCommandLineBuffer(vargs, null, " ", yarnArgs.extraS4NodeJVMParams);
+
+ if (!yarnArgs.extraS4NodeJVMParams.isEmpty()) {
+ vargs.add(CommonS4YarnArgs.S4_NODE_JVM_PARAMETERS);
+ }
// TODO add memory parameter
// vargs.add("-Xdebug");
// vargs.add("-Xrunjdwp:transport=dt_socket,address=8889,server=y");
vargs.add("org.apache.s4.core.Main");
- vargs.add("-zk=" + zkString);
- vargs.add("-c=" + cluster);
+ vargs.add("-zk=" + yarnArgs.zkString);
+ vargs.add("-c=" + yarnArgs.cluster);
- if (Strings.isNullOrEmpty(extraModulesClasses)) {
- extraModulesClasses = HdfsFetcherModule.class.getName();
- } else {
- extraModulesClasses += "," + HdfsFetcherModule.class.getName();
- }
+ List<String> extraModulesClasses = Lists.newArrayList(yarnArgs.extraModulesClasses);
// add module for fetchings from hdfs
- vargs.add("-extraModulesClasses=" + extraModulesClasses);
+ extraModulesClasses.add(HdfsFetcherModule.class.getName());
+ S4YarnClient.addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.EXTRA_MODULES_CLASSES, ",",
+ extraModulesClasses);
// add reference to the configuration
- if (testMode) {
- if (Strings.isNullOrEmpty(namedStringParameters)) {
- namedStringParameters = FileSystem.FS_DEFAULT_NAME_KEY + "="
- + conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
- } else {
- namedStringParameters += "," + FileSystem.FS_DEFAULT_NAME_KEY + "="
- + conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
- }
- }
- if (!Strings.isNullOrEmpty(namedStringParameters)) {
- vargs.add("-namedStringParameters=" + namedStringParameters);
+ List<String> namedStringParams = Lists.newArrayList(yarnArgs.extraNamedParameters);
+ if (yarnArgs.test) {
+ namedStringParams.add(FileSystem.FS_DEFAULT_NAME_KEY + "=" + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
}
+ S4YarnClient.addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.NAMED_STRING_PARAMETERS, ",",
+ namedStringParams);
// TODO
// We should redirect the output to hdfs instead of local logs
// so as to be able to look at the final output after the containers
// have been released.
// Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err]
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/s4-node-stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/s4-node-stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
@@ -693,7 +617,7 @@ public class S4ApplicationMaster {
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
- pri.setPriority(requestPriority);
+ pri.setPriority(yarnArgs.priority);
request.setPriority(pri);
// Set up resource type requirements
@@ -719,7 +643,7 @@ public class S4ApplicationMaster {
req.setApplicationAttemptId(appAttemptID);
req.addAllAsks(requestedContainers);
req.addAllReleases(releasedContainers);
- req.setProgress((float) numCompletedContainers.get() / numTotalContainers);
+ req.setProgress((float) numCompletedContainers.get() / yarnArgs.numContainers);
logger.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size()
+ ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java
new file mode 100644
index 0000000..2203fc0
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4CLIYarnArgs.java
@@ -0,0 +1,63 @@
+package org.apache.s4.tools.yarn;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+
+@Parameters(separators = "=")
+class S4CLIYarnArgs extends CommonS4YarnArgs {
+
+ public static final String S4_YARN_MASTER_MEMORY = "-s4YarnMasterMemory";
+
+ public static final String QUEUE = "-queue";
+
+ public static final String S4_DIR = "-s4Dir";
+
+ @Parameter(names = "-appName", description = "Name of the application", required = false)
+ String appName = "S4App";
+
+ @Parameter(names = { "-nbTasks", "-nbPartitions" }, description = "Number of partitions of the S4 cluster", required = true)
+ int nbTasks;
+
+ @Parameter(names = { "-flp", "-firstListeningPort" }, description = "First listening port for S4 nodes", required = true)
+ int flp = 12000;
+
+ @Parameter(names = "-s4r", description = "URI to S4 archive (.s4r) file. It will be automatically deployed on the allocated S4 nodes. Examples: file:///home/s4/file.s4r or more probably hdfs:///hostname:port/path/file.s4r", required = true)
+ String s4rPath;
+
+ @Parameter(names = S4_DIR, description = "S4 directory. It is used to resolve S4 platform libraries and dependencies, currently from s4dir/subprojects/s4-yarn/build/install/s4-yarn/lib", required = true, hidden = true)
+ String s4Dir;
+
+ @Parameter(names = QUEUE, description = "YARN parameter: RM Queue in which this application is to be submitted", required = false)
+ String queue = "default";
+
+ @Parameter(names = "-timeout", description = "YARN parameter: Application timeout in milliseconds (default is: -1 = no timeout)", required = false)
+ int timeout = -1;
+
+ @Parameter(names = { "-master_memory", S4_YARN_MASTER_MEMORY }, description = "YARN parameter: Amount of memory in MB to be requested to run the application master", required = false, validateWith = S4CLIYarnArgs.MemoryValidator.class)
+ int masterMemory = 256;
+
+ @Parameter(names = "-log_properties", description = "YARN parameter: log4j.properties file", required = false)
+ String logProperties = "";
+
+ public static class MemoryValidator implements IParameterValidator {
+
+ @Override
+ public void validate(String name, String value) throws ParameterException {
+ if (Integer.valueOf(value).intValue() < 0) {
+ throw new ParameterException("Invalid memory size: " + value);
+ }
+ }
+ }
+
+ public static class NbContainersValidator implements IParameterValidator {
+ @Override
+ public void validate(String name, String value) throws ParameterException {
+ if (Integer.valueOf(value).intValue() < 1) {
+ throw new ParameterException("Invalid number of containers: " + value);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
index 8eb5cce..a4b5a12 100644
--- a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
@@ -122,7 +122,7 @@ public class S4YarnClient extends YarnClientImpl {
// Configuration
private Configuration conf;
- YarnArgs yarnArgs;
+ S4CLIYarnArgs yarnArgs;
private int amMemory;
@@ -134,7 +134,7 @@ public class S4YarnClient extends YarnClientImpl {
*/
public static void main(String[] args) {
- YarnArgs yarnArgs = new YarnArgs();
+ S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();
logger.info("S4YarnClient args = " + Arrays.toString(args));
Tools.parseArgs(yarnArgs, args);
@@ -155,7 +155,8 @@ public class S4YarnClient extends YarnClientImpl {
return YARN_CONF_FILES.contains(pathname.getName());
}
}).length == 4)) {
- logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml");
+ logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml",
+ HADOOP_CONF_DIR_ENV);
System.exit(1);
}
@@ -177,7 +178,7 @@ public class S4YarnClient extends YarnClientImpl {
System.exit(1);
}
- public S4YarnClient(YarnArgs yarnArgs, Configuration conf) throws Exception {
+ public S4YarnClient(S4CLIYarnArgs yarnArgs, Configuration conf) throws Exception {
this.yarnArgs = yarnArgs;
this.conf = conf;
init(this.conf);
@@ -235,6 +236,9 @@ public class S4YarnClient extends YarnClientImpl {
// A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
// a multiple of the min value and cannot exceed the max.
// If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+
+ amMemory = yarnArgs.masterMemory;
+
if (amMemory < minMem) {
logger.info("AM memory specified below min threshold of cluster. Using min value." + ", specified="
+ amMemory + ", min=" + minMem);
@@ -308,6 +312,7 @@ public class S4YarnClient extends YarnClientImpl {
logger.info("Setting up app master command");
// vargs.add("${JAVA_HOME}" + "/bin/java");
+ // TODO set java from JAVA_HOME
vargs.add("java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
@@ -316,27 +321,13 @@ public class S4YarnClient extends YarnClientImpl {
// Set Application Master class name
vargs.add(S4ApplicationMaster.class.getName());
// Set params for Application Master
- vargs.add("--container_memory " + String.valueOf(yarnArgs.containerMemory));
- vargs.add("--num_containers " + String.valueOf(yarnArgs.numContainers));
- vargs.add("--priority " + String.valueOf(yarnArgs.priority));
- if (!yarnArgs.extraModulesClasses.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- sb.append("-extraModulesClasses ");
- ListIterator<String> it = yarnArgs.extraModulesClasses.listIterator();
- while (it.hasNext()) {
- sb.append(it.next() + (it.hasNext() ? "," : ""));
- }
- vargs.add(sb.toString());
- }
- if (!yarnArgs.extraNamedParameters.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- sb.append("-namedStringParameters ");
- ListIterator<String> it = yarnArgs.extraNamedParameters.listIterator();
- while (it.hasNext()) {
- sb.append(it.next() + (it.hasNext() ? "," : ""));
- }
- vargs.add(sb.toString());
- }
+ vargs.add(CommonS4YarnArgs.S4_NODE_MEMORY + " " + String.valueOf(yarnArgs.containerMemory));
+ vargs.add(CommonS4YarnArgs.NB_S4_NODES + " " + String.valueOf(yarnArgs.numContainers));
+ vargs.add(CommonS4YarnArgs.PRIORITY + " " + String.valueOf(yarnArgs.priority));
+
+ addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.S4_NODE_JVM_PARAMETERS, ",", yarnArgs.extraS4NodeJVMParams);
+ addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.EXTRA_MODULES_CLASSES, ",", yarnArgs.extraModulesClasses);
+ addListElementsToCommandLineBuffer(vargs, "-namedStringParameters", ",", yarnArgs.extraNamedParameters);
vargs.add("-c " + String.valueOf(yarnArgs.cluster));
vargs.add("-zk " + String.valueOf(yarnArgs.zkString));
@@ -405,6 +396,21 @@ public class S4YarnClient extends YarnClientImpl {
}
+ public static void addListElementsToCommandLineBuffer(Vector<CharSequence> vargs, String paramName,
+ String paramSeparator, List<String> yarnArg) {
+ if (yarnArg != null && !yarnArg.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ if (!Strings.isNullOrEmpty(paramName)) {
+ sb.append(paramName + "=");
+ }
+ ListIterator<String> it = yarnArg.listIterator();
+ while (it.hasNext()) {
+ sb.append(it.next() + (it.hasNext() ? paramSeparator : ""));
+ }
+ vargs.add(sb.toString());
+ }
+ }
+
private Path copyToLocalResources(ApplicationId appId, FileSystem fs, Map<String, LocalResource> localResources,
File file) throws IOException {
Path src = new Path(file.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java
deleted file mode 100644
index ac568b3..0000000
--- a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/YarnArgs.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.s4.tools.yarn;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.core.Main.InlineConfigParameterConverter;
-
-import com.beust.jcommander.IParameterValidator;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-
-@Parameters(commandNames = "yarn", separators = "=", commandDescription = "Deploy S4 application on YARN")
-class YarnArgs {
-
- @Parameter(names = "-appName", description = "Name of the application", required = false)
- String appName = "S4App";
-
- @Parameter(names = "-priority", description = "Application priority", required = false)
- int priority = 0;
-
- @Parameter(names = "-queue", description = "RM Queue in which this application is to be submitted", required = false)
- String queue = "default";
-
- @Parameter(names = "-user", description = "User to run the application as", required = false)
- String user = "";
-
- @Parameter(names = "-timeout", description = "Application timeout in milliseconds (default is: -1 = no timeout)", required = false)
- int timeout = -1;
-
- @Parameter(names = "-s4Dir", description = "S4 directory. It is used to resolve S4 platform libraries and dependencies, currently from s4dir/subprojects/s4-yarn/build/install/s4-yarn/lib", required = true, hidden = true)
- String s4Dir;
-
- public static class MemoryValidator implements IParameterValidator {
-
- @Override
- public void validate(String name, String value) throws ParameterException {
- if (Integer.valueOf(value).intValue() < 0) {
- throw new ParameterException("Invalid memory size: " + value);
- }
- }
- }
-
- @Parameter(names = "-master_memory", description = "Amount of memory in MB to be requested to run the application master", required = false, validateWith = YarnArgs.MemoryValidator.class)
- int masterMemory = 256;
-
- @Parameter(names = { "-cluster", "-c" }, description = "Name of the S4 logical cluster that will contain S4 nodes. NOTE: the cluster is currently defined automatically in ZooKeeper.", required = true)
- String cluster;
-
- @Parameter(names = { "-nbTasks", "-nbPartitions" }, description = "Number of partitions of the S4 cluster", required = true)
- int nbTasks;
-
- @Parameter(names = { "-flp", "-firstListeningPort" }, description = "First listening port for S4 nodes", required = true)
- int flp = 12000;
-
- @Parameter(names = "-s4r", description = "URI to S4 archive (.s4r) file. It will be automatically deployed on the allocated S4 nodes. Examples: file:///home/s4/file.s4r or more probably hdfs:///hostname:port/path/file.s4r", required = true)
- String s4rPath;
-
- @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
- List<String> extraModulesClasses = new ArrayList<String>();
-
- @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
- List<String> extraNamedParameters = new ArrayList<String>();
-
- @Parameter(names = "-container_memory", description = "Amount of memory in MB to be requested to host the S4 node", required = false, validateWith = YarnArgs.MemoryValidator.class)
- int containerMemory = 10;
-
- public static class NbContainersValidator implements IParameterValidator {
- @Override
- public void validate(String name, String value) throws ParameterException {
- if (Integer.valueOf(value).intValue() < 1) {
- throw new ParameterException("Invalid number of containers: " + value);
- }
- }
- }
-
- @Parameter(names = "-num_containers", description = "Number of containers on which the S4 node needs to be hosted (typically: at least as many partitions as the logical cluster)", validateWith = YarnArgs.NbContainersValidator.class)
- int numContainers = 1;
-
- @Parameter(names = "-zk", description = "S4 Zookeeper cluster manager connection string", required = true)
- String zkString;
-
- @Parameter(names = "-log_properties", description = "log4j.properties file", required = false)
- String logProperties = "";
-
- @Parameter(names = "-debug", description = "Dump out debug information")
- boolean debug;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b590862e/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java b/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
index 639c566..771c929 100644
--- a/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
+++ b/subprojects/s4-yarn/src/test/java/org/apache/s4/tools/yarn/TestYarnDeployment.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.io.Files;
import com.google.inject.Injector;
+// NOTE: you must have an updated s4-yarn installed app (this is automatic when tests are run from the command line, otherwise do: gradlew s4-yarn:installApp)
public class TestYarnDeployment extends ZkBasedTest {
private static Logger logger = LoggerFactory.getLogger(TestYarnDeployment.class);
@@ -132,7 +133,7 @@ public class TestYarnDeployment extends ZkBasedTest {
final String[] params = ("-cluster=cluster1 -nbTasks=2 -flp=14000 -s4r=" + destS4rPath.toUri().toString()
+ " -zk=localhost:2181 -s4Dir=" + gradlewFile.getParentFile().getAbsolutePath()).split("[ ]");
- YarnArgs yarnArgs = new YarnArgs();
+ S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();
Tools.parseArgs(yarnArgs, params);
final S4YarnClient client = new S4YarnClient(yarnArgs, conf);