You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 18:39:14 UTC
[2/3] flink git commit: [FLINK-1631] [client] Overhaul of the client.
[FLINK-1631] [client] Overhaul of the client.
- Fix bugs with non-serializable messages
- Separate parser and action logic
- Clean up tests
- Vastly improve logging in CLI client
- Additional tests for parsing / config setup in the command line client
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5385e48d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5385e48d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5385e48d
Branch: refs/heads/master
Commit: 5385e48d94a2df81c8fd6102a889cf42dd93fe2f
Parents: 0333109
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 3 21:49:37 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 18:20:36 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 1211 ++++++++----------
.../apache/flink/client/cli/CancelOptions.java | 37 +
.../flink/client/cli/CliArgsException.java | 30 +
.../flink/client/cli/CliFrontendParser.java | 284 ++++
.../flink/client/cli/CommandLineOptions.java | 57 +
.../apache/flink/client/cli/InfoOptions.java | 30 +
.../apache/flink/client/cli/ListOptions.java | 46 +
.../apache/flink/client/cli/ProgramOptions.java | 97 ++
.../org/apache/flink/client/cli/RunOptions.java | 30 +
.../org/apache/flink/client/program/Client.java | 19 +-
.../CliFrontendAddressConfigurationTest.java | 180 +++
.../flink/client/CliFrontendInfoTest.java | 39 +-
.../CliFrontendJobManagerConnectionTest.java | 166 ---
.../flink/client/CliFrontendListCancelTest.java | 61 +-
.../client/CliFrontendPackageProgramTest.java | 223 ++--
.../apache/flink/client/CliFrontendRunTest.java | 14 +-
.../flink/client/CliFrontendTestUtils.java | 15 +-
.../ExecutionPlanAfterExecutionTest.java | 7 +
.../main/flink-bin/conf/log4j-cli.properties | 1 +
.../flink/runtime/client/JobStatusMessage.java | 59 +
.../flink/runtime/jobmanager/JobManager.scala | 29 +-
.../jobmanager/JobManagerCLIConfiguration.scala | 2 +-
.../runtime/messages/JobManagerMessages.scala | 33 +-
.../org/apache/flink/yarn/YarnTestBase.java | 9 +-
.../apache/flink/yarn/ApplicationMaster.scala | 12 +-
25 files changed, 1627 insertions(+), 1064 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e438de0..1d9d956 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -24,6 +24,8 @@ import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,19 +40,20 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
+
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.UnrecognizedOptionException;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.InfoOptions;
+import org.apache.flink.client.cli.ListOptions;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -58,18 +61,23 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -80,201 +88,133 @@ import scala.concurrent.duration.FiniteDuration;
*/
public class CliFrontend {
- // run job by deploying Flink into a YARN cluster, if this string is specified as the jobmanager address
- public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
-
- // command line interface of the YARN session, with a special initialization here to prefix all options with y/yarn.
- private static FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn");
-
- //actions
+ // actions
private static final String ACTION_RUN = "run";
private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
- // general options
- private static final Option HELP_OPTION = new Option("h", "help", false, "Show the help message for the CLI Frontend or the action.");
-
- // program (jar file) specific options
- private static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
- private static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the JAR file does not specify the class in its manifest.");
- private static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value specified in the configuration.");
- private static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
-
- private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a different JobManager than the one specified in the configuration.");
-
- // info specific options
-
- // list specific options
- private static final Option RUNNING_OPTION = new Option("r", "running", false, "Show only running programs and their JobIDs");
- private static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show only scheduled prorgrams and their JobIDs");
-
- // canceling options
-
- static {
- initOptions();
- }
-
- // action options all include the general options
- private static final Options RUN_OPTIONS = getRunOptions(createGeneralOptions());
- private static final Options INFO_OPTIONS = getInfoOptions(createGeneralOptions());
- private static final Options LIST_OPTIONS = getListOptions(createGeneralOptions());
- private static final Options CANCEL_OPTIONS = getCancelOptions(createGeneralOptions());
-
// config dir parameters
private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
- /**
- * YARN-session related constants
- */
+ // YARN-session related constants
public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism";
public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
- // this has to be a regex for String.split()
- public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
-
- private CommandLineParser parser;
-
- private boolean printHelp;
-
- private boolean globalConfigurationLoaded;
+ public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
- private boolean yarnPropertiesLoaded = false;
+ /**
+ * A special host name used to run a job by deploying Flink into a YARN cluster,
+ * if this string is specified as the JobManager address
+ */
+ public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
- private Properties yarnProperties;
- // this flag indicates if the given Job is executed using a YARN cluster,
- // started for this purpose.
- private boolean runInYarnCluster = false;
+ // --------------------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------------------
- private AbstractFlinkYarnCluster yarnCluster = null;
+ private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
- protected String configurationDirectory = null;
+ private final File configDirectory;
+ private final Configuration config;
- /**
- * Initializes the class
- */
- public CliFrontend() {
- parser = new PosixParser();
- }
-
- // --------------------------------------------------------------------------------------------
- // Setup of options
- // --------------------------------------------------------------------------------------------
+ private final FiniteDuration askTimeout;
+
+ private final FiniteDuration lookupTimeout;
+
+ private InetSocketAddress jobManagerAddress;
+
+ private ActorSystem actorSystem;
+
+ private AbstractFlinkYarnCluster yarnCluster;
- private static void initOptions() {
- HELP_OPTION.setRequired(false);
- JAR_OPTION.setRequired(false);
- JAR_OPTION.setArgName("jarfile");
-
- CLASS_OPTION.setRequired(false);
- CLASS_OPTION.setArgName("classname");
-
- ADDRESS_OPTION.setRequired(false);
- ADDRESS_OPTION.setArgName("host:port");
-
- PARALLELISM_OPTION.setRequired(false);
- PARALLELISM_OPTION.setArgName("parallelism");
-
- ARGS_OPTION.setRequired(false);
- ARGS_OPTION.setArgName("programArgs");
- ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
- RUNNING_OPTION.setRequired(false);
- SCHEDULED_OPTION.setRequired(false);
- }
-
- static Options createGeneralOptions() {
- Options options = new Options();
- options.addOption(HELP_OPTION);
- // backwards compatibility: ignore verbose flag (-v)
- options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
- return options;
- }
-
- // gets the program options with the old flags for jar file and arguments
- static Options getProgramSpecificOptions(Options options) {
- options.addOption(JAR_OPTION);
- options.addOption(CLASS_OPTION);
- options.addOption(PARALLELISM_OPTION);
- options.addOption(ARGS_OPTION);
-
- // also add the YARN options so that the parser can parse them
- yarnSessionCLi.getYARNSessionCLIOptions(options);
- return options;
- }
-
- // gets the program options without the old flags for jar file and arguments
- static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
- options.addOption(CLASS_OPTION);
- options.addOption(PARALLELISM_OPTION);
- return options;
- }
-
- /**
- * Builds command line options for the run action.
- *
- * @return Command line options for the run action.
- */
- static Options getRunOptions(Options options) {
- Options o = getProgramSpecificOptions(options);
- return getJobManagerAddressOption(o);
- }
-
- static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
- Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
- return getJobManagerAddressOption(o);
- }
-
- static Options getJobManagerAddressOption(Options options) {
- options.addOption(ADDRESS_OPTION);
- return options;
- }
-
- /**
- * Builds command line options for the info action.
- *
- * @return Command line options for the info action.
- */
- static Options getInfoOptions(Options options) {
- options = getProgramSpecificOptions(options);
- options = getJobManagerAddressOption(options);
- return options;
- }
-
- static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
- options = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
- options = getJobManagerAddressOption(options);
- return options;
- }
-
/**
- * Builds command line options for the list action.
- *
- * @return Command line options for the list action.
+ *
+ * @throws Exception Thrown if teh configuration directory was not found, the configuration could not
+ * be loaded, or the YARN properties could not be parsed.
*/
- static Options getListOptions(Options options) {
- options.addOption(RUNNING_OPTION);
- options.addOption(SCHEDULED_OPTION);
- options = getJobManagerAddressOption(options);
- return options;
+ public CliFrontend() throws Exception {
+ this(getConfigurationDirectoryFromEnv());
}
-
- /**
- * Builds command line options for the cancel action.
- *
- * @return Command line options for the cancel action.
- */
- static Options getCancelOptions(Options options) {
- options = getJobManagerAddressOption(options);
- return options;
+
+ public CliFrontend(String configDir) throws Exception {
+
+ // configure the config directory
+ this.configDirectory = new File(configDir);
+ LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath());
+
+ // load the configuration
+ LOG.info("Trying to load configuration file");
+ GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath());
+ this.config = GlobalConfiguration.getConfiguration();
+
+ // load the YARN properties
+ File propertiesFile = new File(configDirectory, YARN_PROPERTIES_FILE);
+ if (propertiesFile.exists()) {
+
+ logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
+
+ Properties yarnProperties = new Properties();
+ try {
+ InputStream is = new FileInputStream(propertiesFile);
+ try {
+ yarnProperties.load(is);
+ }
+ finally {
+ is.close();
+ }
+ }
+ catch (IOException e) {
+ throw new Exception("Cannot read the YARN properties file", e);
+ }
+
+ // configure the default degree of parallelism from YARN
+ String propDegree = yarnProperties.getProperty(YARN_PROPERTIES_DOP);
+ if (propDegree != null) { // maybe the property is not set
+ try {
+ int paraDegree = Integer.parseInt(propDegree);
+ this.config.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
+
+ logAndSysout("YARN properties set default parallelism to " + paraDegree);
+ }
+ catch (NumberFormatException e) {
+ throw new Exception("Error while parsing the YARN properties: " +
+ "Property " + YARN_PROPERTIES_DOP + " is not an integer.");
+ }
+ }
+
+ // get the JobManager address from the YARN properties
+ String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+ if (address != null) {
+ try {
+ jobManagerAddress = parseJobManagerAddress(address);
+ }
+ catch (Exception e) {
+ throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
+ }
+
+ logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
+ }
+
+ // handle the YARN client's dynamic properties
+ String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+ List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
+ for (Tuple2<String, String> dynamicProperty : dynamicProperties) {
+ this.config.setString(dynamicProperty.f0, dynamicProperty.f1);
+ }
+ }
+
+ this.askTimeout = AkkaUtils.getTimeout(config);
+ this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
}
+
// --------------------------------------------------------------------------------------------
// Execute Actions
@@ -286,182 +226,137 @@ public class CliFrontend {
* @param args Command line arguments for the run action.
*/
protected int run(String[] args) {
- // Parse command line options
- CommandLine line;
+ LOG.info("Running 'run' command.");
+
+ RunOptions options;
try {
- line = parser.parse(RUN_OPTIONS, args, true);
- evaluateGeneralOptions(line);
+ options = CliFrontendParser.parseRunCommand(args);
}
- catch (MissingOptionException e) {
+ catch (CliArgsException e) {
return handleArgException(e);
}
- catch (MissingArgumentException e) {
- return handleArgException(e);
- }
- catch (UnrecognizedOptionException e) {
- return handleArgException(e);
- }
- catch (Exception e) {
- return handleError(e);
+ catch (Throwable t) {
+ return handleError(t);
}
-
- // ------------ check for help first --------------
-
- if (printHelp) {
- printHelpForRun();
+
+ // evaluate help flag
+ if (options.isPrintHelp()) {
+ CliFrontendParser.printHelpForRun();
return 0;
}
+ if (options.getJarFilePath() == null) {
+ return handleArgException(new CliArgsException("The program JAR file was not specified."));
+ }
+
PackagedProgram program;
- Client client;
try {
- program = buildProgram(line);
- client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName());
- } catch (FileNotFoundException e) {
+ LOG.info("Building program from JAR file");
+ program = buildProgram(options);
+ }
+ catch (FileNotFoundException e) {
return handleArgException(e);
- } catch (ProgramInvocationException e) {
+ }
+ catch (ProgramInvocationException e) {
return handleError(e);
- } catch (Throwable t) {
+ }
+ catch (Throwable t) {
return handleError(t);
}
- int parallelism = -1;
- if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
- String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
- try {
- parallelism = Integer.parseInt(parString);
- } catch (NumberFormatException e) {
- System.out.println("The value " + parString + " is invalid for the degree of parallelism.");
- return 1;
- }
+ try {
+ Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
- if (parallelism <= 0) {
- System.out.println("Invalid value for the degree-of-parallelism. Parallelism must be greater than zero.");
- return 1;
- }
- }
+ int parallelism = options.getParallelism();
+ int exitCode = executeProgram(program, client, parallelism);
- int exitCode = executeProgram(program, client, parallelism);
+ if (yarnCluster != null) {
+ List<String> msgs = yarnCluster.getNewMessages();
+ if (msgs != null && msgs.size() > 1) {
- if(runInYarnCluster) {
- List<String> msgs = yarnCluster.getNewMessages();
- if(msgs != null && msgs.size() > 1) {
- System.out.println("The following messages were created by the YARN cluster while running the Job:");
- for(String msg : msgs) {
- System.out.println(msg);
+ logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+ for (String msg : msgs) {
+ logAndSysout(msg);
+ }
+ }
+ if (yarnCluster.hasFailed()) {
+ logAndSysout("YARN cluster is in failed state!");
+ logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}
- if(yarnCluster.hasFailed()) {
- System.out.println("YARN cluster is in failed state!");
- System.out.println("YARN Diagnostics: " + yarnCluster.getDiagnostics());
- }
- System.out.println("Shutting down YARN cluster");
- yarnCluster.shutdown();
- }
-
- return exitCode;
- }
- // --------------------------------------------------------------------------------------------
-
- protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
- JobExecutionResult execResult;
- try {
- client.setPrintStatusDuringExecution(true);
- execResult = client.run(program, parallelism, true);
+ return exitCode;
}
- catch (ProgramInvocationException e) {
- return handleError(e);
+ catch (Throwable t) {
+ return handleError(t);
}
finally {
- program.deleteExtractedLibraries();
- }
-
- // we come here after the job has finished
- if (execResult != null) {
- System.out.println("Job Runtime: " + execResult.getNetRuntime());
- Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
- if (accumulatorsResult.size() > 0) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ if (yarnCluster != null) {
+ logAndSysout("Shutting down YARN cluster");
+ yarnCluster.shutdown();
+ }
+ if (program != null) {
+ program.deleteExtractedLibraries();
}
}
- return 0;
}
-
- // --------------------------------------------------------------------------------------------
-
+
/**
* Executes the info action.
*
* @param args Command line arguments for the info action.
*/
protected int info(String[] args) {
+ LOG.info("Running 'info' command.");
+
// Parse command line options
- CommandLine line;
+ InfoOptions options;
try {
- line = parser.parse(INFO_OPTIONS, args, false);
- evaluateGeneralOptions(line);
- }
- catch (MissingOptionException e) {
- return handleArgException(e);
+ options = CliFrontendParser.parseInfoCommand(args);
}
- catch (MissingArgumentException e) {
+ catch (CliArgsException e) {
return handleArgException(e);
}
- catch (UnrecognizedOptionException e) {
- return handleArgException(e);
- }
- catch (Exception e) {
- return handleError(e);
+ catch (Throwable t) {
+ return handleError(t);
}
- if (printHelp) {
- printHelpForInfo();
+ // evaluate help flag
+ if (options.isPrintHelp()) {
+ CliFrontendParser.printHelpForInfo();
return 0;
}
+ if (options.getJarFilePath() == null) {
+ return handleArgException(new CliArgsException("The program JAR file was not specified."));
+ }
+
// -------- build the packaged program -------------
PackagedProgram program;
try {
- program = buildProgram(line);
- } catch (FileNotFoundException e) {
- return handleError(e);
- } catch (ProgramInvocationException e) {
- return handleError(e);
- } catch (Throwable t) {
- return handleError(t);
+ LOG.info("Building program from JAR file");
+ program = buildProgram(options);
}
-
- int parallelism = -1;
- if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
- String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
- try {
- parallelism = Integer.parseInt(parString);
- } catch (NumberFormatException e) {
- System.out.println("The value " + parString + " is invalid for the degree of parallelism.");
- return 1;
- }
-
- if (parallelism <= 0) {
- System.out.println("Invalid value for the degree-of-parallelism. Parallelism must be greater than zero.");
- return 1;
- }
+ catch (Throwable t) {
+ return handleError(t);
}
try {
- Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName());
+ int parallelism = options.getParallelism();
+
+ LOG.info("Creating program plan dump");
+ Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);
if (jsonPlan != null) {
System.out.println("----------------------- Execution Plan -----------------------");
System.out.println(jsonPlan);
System.out.println("--------------------------------------------------------------");
- } else {
- System.out.println("JSON plan could not be compiled.");
}
-
+ else {
+ System.out.println("JSON plan could not be generated.");
+ }
return 0;
}
catch (Throwable t) {
@@ -478,33 +373,27 @@ public class CliFrontend {
* @param args Command line arguments for the list action.
*/
protected int list(String[] args) {
- // Parse command line options
- CommandLine line;
+ LOG.info("Running 'list' command.");
+
+ ListOptions options;
try {
- line = parser.parse(LIST_OPTIONS, args, false);
- evaluateGeneralOptions(line);
- }
- catch (MissingOptionException e) {
- return handleArgException(e);
+ options = CliFrontendParser.parseListCommand(args);
}
- catch (MissingArgumentException e) {
+ catch (CliArgsException e) {
return handleArgException(e);
}
- catch (UnrecognizedOptionException e) {
- return handleArgException(e);
- }
- catch (Exception e) {
- return handleError(e);
+ catch (Throwable t) {
+ return handleError(t);
}
-
- if (printHelp) {
- printHelpForList();
+
+ // evaluate help flag
+ if (options.isPrintHelp()) {
+ CliFrontendParser.printHelpForList();
return 0;
}
-
- // get list options
- boolean running = line.hasOption(RUNNING_OPTION.getOpt());
- boolean scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+
+ boolean running = options.getRunning();
+ boolean scheduled = options.getScheduled();
// print running and scheduled jobs if not option supplied
if (!running && !scheduled) {
@@ -513,87 +402,87 @@ public class CliFrontend {
}
try {
- ActorRef jobManager = getJobManager(line, getGlobalConfiguration());
- if (jobManager == null) {
- return 1;
- }
+ ActorRef jobManager = getJobManager(options);
- final Future<Object> response = Patterns.ask(jobManager,
- JobManagerMessages.getRequestRunningJobs(), new Timeout(getAkkaTimeout()));
+ LOG.info("Connecting to JobManager to retrieve list of jobs");
+ Future<Object> response = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
Object result;
try {
- result = Await.result(response, getAkkaTimeout());
- } catch (Exception exception) {
- throw new IOException("Could not retrieve running jobs from job manager.",
- exception);
+ result = Await.result(response, askTimeout);
+ }
+ catch (Exception e) {
+ throw new Exception("Could not retrieve running jobs from the JobManager.", e);
}
- if (!(result instanceof RunningJobs)) {
- throw new RuntimeException("ReqeustRunningJobs requires a response of type " +
- "RunningJobs. Instead the response is of type " + result.getClass() + ".");
- } else {
- Iterable<ExecutionGraph> jobs = ((RunningJobs) result).asJavaIterable();
+ if (result instanceof RunningJobsStatus) {
+ LOG.info("Successfully retrieved list of jobs");
+
+ List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
- ArrayList<ExecutionGraph> runningJobs = null;
- ArrayList<ExecutionGraph> scheduledJobs = null;
+ ArrayList<JobStatusMessage> runningJobs = null;
+ ArrayList<JobStatusMessage> scheduledJobs = null;
if (running) {
- runningJobs = new ArrayList<ExecutionGraph>();
+ runningJobs = new ArrayList<JobStatusMessage>();
}
if (scheduled) {
- scheduledJobs = new ArrayList<ExecutionGraph>();
+ scheduledJobs = new ArrayList<JobStatusMessage>();
}
- for (ExecutionGraph rj : jobs) {
-
- if (running && rj.getState().equals(JobStatus.RUNNING)) {
+ for (JobStatusMessage rj : jobs) {
+ if (running && rj.getJobState().equals(JobStatus.RUNNING)) {
runningJobs.add(rj);
}
- if (scheduled && rj.getState().equals(JobStatus.CREATED)) {
+ if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) {
scheduledJobs.add(rj);
}
}
SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
- Comparator<ExecutionGraph> njec = new Comparator<ExecutionGraph>(){
-
+ Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){
@Override
- public int compare(ExecutionGraph o1, ExecutionGraph o2) {
- return (int)(o1.getStatusTimestamp(o1.getState())-o2.getStatusTimestamp(o2
- .getState()));
+ public int compare(JobStatusMessage o1, JobStatusMessage o2) {
+ return (int)(o1.getStartTime()-o2.getStartTime());
}
};
if (running) {
if(runningJobs.size() == 0) {
System.out.println("No running jobs.");
- } else {
+ }
+ else {
Collections.sort(runningJobs, njec);
System.out.println("------------------------ Running Jobs ------------------------");
- for(ExecutionGraph rj : runningJobs) {
- System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
- +" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+ for (JobStatusMessage rj : runningJobs) {
+ System.out.println(df.format(new Date(rj.getStartTime()))
+ + " : " + rj.getJobId() + " : " + rj.getJobName());
}
System.out.println("--------------------------------------------------------------");
}
}
if (scheduled) {
- if(scheduledJobs.size() == 0) {
+ if (scheduledJobs.size() == 0) {
System.out.println("No scheduled jobs.");
- } else {
+ }
+ else {
Collections.sort(scheduledJobs, njec);
System.out.println("----------------------- Scheduled Jobs -----------------------");
- for(ExecutionGraph rj : scheduledJobs) {
- System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
- +" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+ for(JobStatusMessage rj : scheduledJobs) {
+ System.out.println(df.format(new Date(rj.getStartTime()))
+ + " : " + rj.getJobId() + " : " + rj.getJobName());
}
System.out.println("--------------------------------------------------------------");
}
}
return 0;
}
+ else {
+ throw new Exception("ReqeustRunningJobs requires a response of type " +
+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+ }
}
catch (Throwable t) {
return handleError(t);
@@ -601,452 +490,299 @@ public class CliFrontend {
}
/**
- * Executes the cancel action.
+ * Executes the CANCEL action.
*
* @param args Command line arguments for the cancel action.
*/
protected int cancel(String[] args) {
- // Parse command line options
- CommandLine line;
+ LOG.info("Running 'cancel' command.");
+
+ CancelOptions options;
try {
- line = parser.parse(CANCEL_OPTIONS, args, false);
- evaluateGeneralOptions(line);
- }
- catch (MissingOptionException e) {
- return handleArgException(e);
+ options = CliFrontendParser.parseCancelCommand(args);
}
- catch (MissingArgumentException e) {
+ catch (CliArgsException e) {
return handleArgException(e);
}
- catch (UnrecognizedOptionException e) {
- return handleArgException(e);
- }
- catch (Exception e) {
- return handleError(e);
+ catch (Throwable t) {
+ return handleError(t);
}
-
- if (printHelp) {
- printHelpForCancel();
+
+ // evaluate help flag
+ if (options.isPrintHelp()) {
+ CliFrontendParser.printHelpForCancel();
return 0;
}
- String[] cleanedArgs = line.getArgs();
+ String[] cleanedArgs = options.getArgs();
JobID jobId;
if (cleanedArgs.length > 0) {
String jobIdString = cleanedArgs[0];
try {
jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
- } catch (Exception e) {
+ }
+ catch (Exception e) {
+ LOG.error("Error: The value for the Job ID is not a valid ID.");
System.out.println("Error: The value for the Job ID is not a valid ID.");
return 1;
}
- } else {
+ }
+ else {
+ LOG.error("Missing JobID in the command line arguments.");
System.out.println("Error: Specify a Job ID to cancel a job.");
return 1;
}
try {
- ActorRef jobManager = getJobManager(line, getGlobalConfiguration());
-
- if (jobManager == null) {
- return 1;
- }
-
- final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId),
- new Timeout(getAkkaTimeout()));
+ ActorRef jobManager = getJobManager(options);
+ Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
try {
- Await.ready(response, getAkkaTimeout());
- } catch (Exception exception) {
- throw new IOException("Canceling the job with job ID " + jobId + " failed.",
- exception);
+ Await.result(response, askTimeout);
+ return 0;
+ }
+ catch (Exception e) {
+ throw new Exception("Canceling the job with ID " + jobId + " failed.", e);
}
-
- return 0;
}
catch (Throwable t) {
return handleError(t);
}
}
+ // --------------------------------------------------------------------------------------------
+ // Interaction with programs and JobManager
+ // --------------------------------------------------------------------------------------------
+
+ protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
+ LOG.info("Starting execution or program");
+ JobExecutionResult execResult;
+ try {
+ client.setPrintStatusDuringExecution(true);
+ execResult = client.run(program, parallelism, true);
+ }
+ catch (ProgramInvocationException e) {
+ return handleError(e);
+ }
+ finally {
+ program.deleteExtractedLibraries();
+ }
+
+ LOG.info("Program execution finished");
+
+ // we come here after the job has finished
+ if (execResult != null) {
+ System.out.println("Job Runtime: " + execResult.getNetRuntime());
+ Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
+ if (accumulatorsResult.size() > 0) {
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ }
+ }
+ return 0;
+ }
+
/**
- * @param line
- *
+ * Creates a Packaged program from the given command line options.
+ *
* @return A PackagedProgram (upon success)
* @throws java.io.FileNotFoundException, org.apache.flink.client.program.ProgramInvocationException, java.lang.Throwable
*/
- protected PackagedProgram buildProgram(CommandLine line) throws FileNotFoundException, ProgramInvocationException {
- String[] programArgs = line.hasOption(ARGS_OPTION.getOpt()) ?
- line.getOptionValues(ARGS_OPTION.getOpt()) :
- line.getArgs();
-
- // take the jar file from the option, or as the first trailing parameter (if available)
- String jarFilePath;
- if (line.hasOption(JAR_OPTION.getOpt())) {
- jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
- }
- else if (programArgs.length > 0) {
- jarFilePath = programArgs[0];
- programArgs = Arrays.copyOfRange(programArgs, 1, programArgs.length);
- }
- else {
- throw new FileNotFoundException("Error: Jar file was not specified.");
+ protected PackagedProgram buildProgram(ProgramOptions options)
+ throws FileNotFoundException, ProgramInvocationException
+ {
+ String[] programArgs = options.getProgramArgs();
+ String jarFilePath = options.getJarFilePath();
+
+ if (jarFilePath == null) {
+ throw new IllegalArgumentException("The program JAR file was not specified.");
}
-
+
File jarFile = new File(jarFilePath);
// Check if JAR file exists
if (!jarFile.exists()) {
- throw new FileNotFoundException("Error: Jar file does not exist: " + jarFile);
+ throw new FileNotFoundException("JAR file does not exist: " + jarFile);
}
else if (!jarFile.isFile()) {
- throw new FileNotFoundException("Error: Jar file is not a file: " + jarFile);
+ throw new FileNotFoundException("JAR file is not a file: " + jarFile);
}
// Get assembler class
- String entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
- line.getOptionValue(CLASS_OPTION.getOpt()) :
- null;
+ String entryPointClass = options.getEntryPointClassName();
return entryPointClass == null ?
new PackagedProgram(jarFile, programArgs) :
new PackagedProgram(jarFile, entryPointClass, programArgs);
}
-
- protected String getJobManagerAddressString(CommandLine line) throws IOException {
- Configuration configuration = getGlobalConfiguration();
-
- // first, check if the address comes from the command line option
- if (line.hasOption(ADDRESS_OPTION.getOpt())) {
+
+
+ protected InetSocketAddress getJobManagerAddress(CommandLineOptions options) throws Exception {
+
+ // first, check if the address is specified as an option
+ if (options.getJobManagerAddress() != null) {
+ return parseJobManagerAddress(options.getJobManagerAddress());
+ }
+
+ // second, check whether the address was already parsed, or configured through the YARN properties
+ if (jobManagerAddress == null) {
+ // config file must have the address
+ String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+
+ // verify that there is a jobmanager address and port in the configuration
+ if (jobManagerHost == null) {
+ throw new Exception("Found no configuration in the config directory '" + configDirectory
+ + "' that specifies the JobManager address.");
+ }
+
+ int jobManagerPort;
try {
- return line.getOptionValue(ADDRESS_OPTION.getOpt());
+ jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
}
- catch (Exception e) {
- System.out.println("Error: The JobManager address has an invalid format. " + e.getMessage());
- return null;
+ catch (NumberFormatException e) {
+ throw new Exception("Invalid value for the JobManager port (" +
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + ") in the configuration.");
}
- }
- else {
- Properties yarnProps = getYarnProperties();
- if(yarnProps != null) {
- try {
- String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
- System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, "
- + "using \""+address+"\" to connect to the JobManager");
- return address;
- } catch (Exception e) {
- System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. "
- + e.getMessage());
- return null;
- }
- } else {
- // regular config file gives the address
- String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-
- // verify that there is a jobmanager address and port in the configuration
- if (jobManagerAddress == null) {
- System.out.println("Error: Found no configuration in the config directory '" +
- getConfigurationDirectory() + "' that specifies the JobManager address.");
- return null;
- }
-
- int jobManagerPort;
- try {
- jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
- } catch (NumberFormatException e) {
- System.out.println("Invalid value for the JobManager IPC port (" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
- ") in the configuration.");
- return null;
- }
-
- if (jobManagerPort == -1) {
- System.out.println("Error: Found no configuration in the config directory '" +
- getConfigurationDirectory() + "' that specifies the JobManager port.");
- return null;
- }
-
- return jobManagerAddress + ":" + jobManagerPort;
+
+ if (jobManagerPort == -1) {
+ throw new Exception("Found no configuration in the config directory '" + configDirectory
+ + "' that specifies the JobManager port.");
}
- }
- }
-
- protected ActorRef getJobManager(CommandLine line, Configuration config) throws IOException {
- //TODO: Get ActorRef from YarnCluster if we are in YARN mode.
- String jobManagerAddressStr = getJobManagerAddressString(line);
- if (jobManagerAddressStr == null) {
- return null;
- }
- final ActorSystem actorSystem;
- try {
- scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
- actorSystem = AkkaUtils.createActorSystem(config, new Some<scala.Tuple2<String, Object>>(systemEndpoint));
- }
- catch (Exception e) {
- throw new IOException("Could not start actor system to communicate with JobManager", e);
+ jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort);
}
- try {
- InetSocketAddress address = RemoteExecutor.getInetFromHostport(jobManagerAddressStr);
- return JobManager.getJobManagerRemoteReference(address, actorSystem, config);
- }
- finally {
- actorSystem.shutdown();
- }
+ return jobManagerAddress;
}
+ protected ActorRef getJobManager(CommandLineOptions options) throws Exception {
+ //TODO: Get ActorRef from YarnCluster if we are in YARN mode.
- public String getConfigurationDirectory() {
- if (configurationDirectory == null) {
- configurationDirectory = getConfigurationDirectoryFromEnv();
- }
- return configurationDirectory;
- }
+ InetSocketAddress address = getJobManagerAddress(options);
- /**
- * Reads configuration settings. The default path can be overridden
- * by setting the ENV variable "FLINK_CONF_DIR".
- *
- * @return Flink's global configuration
- */
- protected Configuration getGlobalConfiguration() {
- if (!globalConfigurationLoaded) {
- String location = getConfigurationDirectory();
- GlobalConfiguration.loadConfiguration(location);
- // set default parallelization degree
- Properties yarnProps;
+ // start an actor system if needed
+ if (this.actorSystem == null) {
+ LOG.info("Starting actor system to communicate with JobManager");
try {
- yarnProps = getYarnProperties();
- if(yarnProps != null) {
- String propDegree = yarnProps.getProperty(YARN_PROPERTIES_DOP);
- int paraDegree = -1;
- if(propDegree != null) { // maybe the property is not set
- paraDegree = Integer.valueOf(propDegree);
- }
- Configuration c = GlobalConfiguration.getConfiguration();
- if(paraDegree != -1) {
- c.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
- }
- // handle the YARN client's dynamic properties
- String dynamicPropertiesEncoded = yarnProps.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
- List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
- for(Tuple2<String, String> dynamicProperty : dynamicProperties) {
- c.setString(dynamicProperty.f0, dynamicProperty.f1);
- }
- GlobalConfiguration.includeConfiguration(c); // update config
- }
- } catch (IOException e) {
- e.printStackTrace();
- System.err.println("Error while loading YARN properties: " + e.getMessage());
+ scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
+ this.actorSystem = AkkaUtils.createActorSystem(config,
+ new Some<scala.Tuple2<String, Object>>(systemEndpoint));
+ }
+ catch (Exception e) {
+ throw new IOException("Could not start actor system to communicate with JobManager", e);
}
- globalConfigurationLoaded = true;
+ LOG.info("Actor system successfully started");
}
- return GlobalConfiguration.getConfiguration();
- }
- public static String getConfigurationDirectoryFromEnv() {
- String location;
- if (System.getenv(ENV_CONFIG_DIRECTORY) != null) {
- location = System.getenv(ENV_CONFIG_DIRECTORY);
- } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
- location = CONFIG_DIRECTORY_FALLBACK_1;
- } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
- location = CONFIG_DIRECTORY_FALLBACK_2;
- } else {
- throw new RuntimeException("The configuration directory was not found. Please configure the '" +
- ENV_CONFIG_DIRECTORY + "' environment variable properly.");
- }
- return location;
- }
- protected FiniteDuration getAkkaTimeout(){
- Configuration config = getGlobalConfiguration();
- return AkkaUtils.getTimeout(config);
+ LOG.info("Trying to lookup JobManager");
+ ActorRef jmActor = JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout);
+ LOG.info("JobManager is at " + jmActor.path());
+ return jmActor;
}
- public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
- List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
- if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
- String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
- for(String propLine : propertyLines) {
- if(propLine == null) {
- continue;
- }
- String[] kv = propLine.split("=");
- if(kv != null && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
- ret.add(new Tuple2<String, String>(kv[0], kv[1]));
- }
- }
- }
- return ret;
- }
-
- protected Properties getYarnProperties() throws IOException {
- if(!yarnPropertiesLoaded) {
- String loc = getConfigurationDirectory();
- File propertiesFile = new File(loc + '/' + YARN_PROPERTIES_FILE);
- if (propertiesFile.exists()) {
- Properties props = new Properties();
- InputStream is = new FileInputStream( propertiesFile );
- props.load(is);
- yarnProperties = props;
- is.close();
- } else {
- yarnProperties = null;
- }
- yarnPropertiesLoaded = true;
- }
- return yarnProperties;
- }
+
- protected Client getClient(CommandLine line, ClassLoader classLoader, String programName) throws IOException {
- String jmAddrString = getJobManagerAddressString(line);
- InetSocketAddress jobManagerAddress = null;
- if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
- System.out.println("YARN cluster mode detected. Switching Log4j output to console");
+ protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName) throws Exception {
+
+ InetSocketAddress jobManagerAddress;
+
+ if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
+ logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
- this.runInYarnCluster = true;
// user wants to run Flink in YARN cluster.
- AbstractFlinkYarnClient flinkYarnClient = yarnSessionCLi.createFlinkYarnClient(line);
- if(flinkYarnClient == null) {
+ CommandLine commandLine = options.getCommandLine();
+ AbstractFlinkYarnClient flinkYarnClient =
+ CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
+
+ if (flinkYarnClient == null) {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}
try {
- yarnCluster = flinkYarnClient.deploy("Flink Application: "+programName);
- } catch(Exception e) {
+ yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
+ }
+ catch(Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}
+
jobManagerAddress = yarnCluster.getJobManagerAddress();
- System.out.println("YARN cluster started");
- System.out.println("JobManager web interface address "+yarnCluster.getWebInterfaceURL());
- System.out.println("Waiting until all TaskManagers have connected");
+
+ logAndSysout("YARN cluster started");
+ logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
+ logAndSysout("Waiting until all TaskManagers have connected");
+
while(true) {
FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
- if(status != null) {
+ if (status != null) {
if (status.getNumberOfTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
- System.out.println("TaskManager status (" + status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")");
+ logAndSysout("TaskManager status (" + status.getNumberOfTaskManagers() + "/" + flinkYarnClient.getTaskManagerCount() + ")");
} else {
- System.out.println("Enough TaskManagers are connected");
+ logAndSysout("All TaskManagers are connected");
break;
}
} else {
- System.out.println("No status updates from YARN cluster received so far. Waiting ...");
+ logAndSysout("No status updates from YARN cluster received so far. Waiting ...");
}
+
try {
Thread.sleep(500);
- } catch (InterruptedException e) {
- System.err.println("Thread as interrupted"); Thread.currentThread().interrupt();
+ }
+ catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for TaskManagers");
+ System.err.println("Thread is interrupted");
+ Thread.currentThread().interrupt();
}
}
- } else {
- jobManagerAddress = RemoteExecutor.getInetFromHostport(jmAddrString);
}
- return new Client(jobManagerAddress, getGlobalConfiguration(), classLoader);
+ else {
+ jobManagerAddress = getJobManagerAddress(options);
+ }
+ return new Client(jobManagerAddress, config, classLoader);
}
+ // --------------------------------------------------------------------------------------------
+ // Logging and Exception Handling
+ // --------------------------------------------------------------------------------------------
+
/**
- * Prints the help for the client.
+ * Displays an exception message for incorrect command line arguments.
+ *
+ * @param e The exception to display.
+ * @return The return code for the process.
*/
- private void printHelp() {
- System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
- System.out.println();
- System.out.println("The following actions are available:");
-
- /* The only general option is -h and the help messages are always printed on errors.
- HelpFormatter formatter = new HelpFormatter();
- formatter.setWidth(80);
- formatter.setLeftPadding(5);
- formatter.setSyntaxPrefix(" general options:");
- formatter.printHelp(" ", GENERAL_OPTIONS);
- */
-
- printHelpForRun();
- printHelpForInfo();
- printHelpForList();
- printHelpForCancel();
-
- System.out.println();
- }
-
- private void printHelpForRun() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.setLeftPadding(5);
- formatter.setWidth(80);
-
- System.out.println("\nAction \"run\" compiles and runs a program.");
- System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>");
- formatter.setSyntaxPrefix(" \"run\" action options:");
- formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
- formatter.setSyntaxPrefix(" Additional arguments if -m "+YARN_DEPLOY_JOBMANAGER+" is set:");
- Options yarnOpts = new Options();
- yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
- formatter.printHelp(" ", yarnOpts);
- System.out.println();
- }
-
- private void printHelpForInfo() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.setLeftPadding(5);
- formatter.setWidth(80);
-
- System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");
- System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
- formatter.setSyntaxPrefix(" \"info\" action options:");
- formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
- System.out.println();
- }
-
- private void printHelpForList() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.setLeftPadding(5);
- formatter.setWidth(80);
-
- System.out.println("\nAction \"list\" lists running and scheduled programs.");
- System.out.println("\n Syntax: list [OPTIONS]");
- formatter.setSyntaxPrefix(" \"list\" action options:");
- formatter.printHelp(" ", getListOptions(new Options()));
- System.out.println();
- }
-
- private void printHelpForCancel() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.setLeftPadding(5);
- formatter.setWidth(80);
-
- System.out.println("\nAction \"cancel\" cancels a running program.");
- System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
- formatter.setSyntaxPrefix(" \"cancel\" action options:");
- formatter.printHelp(" ", getCancelOptions(new Options()));
- System.out.println();
- }
-
private int handleArgException(Exception e) {
+ LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
+
System.out.println(e.getMessage());
System.out.println();
- System.out.println("Specify the help option (-h or --help) to get help on the command.");
+ System.out.println("Use the help option (-h or --help) to get help on the command.");
return 1;
}
/**
- * Displays exceptions.
+ * Displays an exception message.
*
* @param t The exception to display.
+ * @return The return code for the process.
*/
private int handleError(Throwable t) {
+ LOG.error("Error while running the command.", t);
+
t.printStackTrace();
System.err.println();
System.err.println("The exception above occurred while trying to run your command.");
return 1;
}
+ private void logAndSysout(String message) {
+ LOG.info(message);
+ System.out.println(message);
+ }
-
+ // --------------------------------------------------------------------------------------------
+ // Entry point for executable
+ // --------------------------------------------------------------------------------------------
- private void evaluateGeneralOptions(CommandLine line) {
- // check help flag
- this.printHelp = line.hasOption(HELP_OPTION.getOpt());
- }
-
/**
* Parses the command line arguments and starts the requested action.
*
@@ -1057,7 +793,7 @@ public class CliFrontend {
// check for action
if (args.length < 1) {
- printHelp();
+ CliFrontendParser.printHelp();
System.out.println("Please specify an action.");
return 1;
}
@@ -1071,8 +807,11 @@ public class CliFrontend {
// do action
if (action.equals(ACTION_RUN)) {
// run() needs to run in a secured environment for the optimizer.
- if(SecurityUtils.isSecurityEnabled()) {
- System.out.println("Secure Hadoop setup detected.");
+ if (SecurityUtils.isSecurityEnabled()) {
+ String message = "Secure Hadoop environment setup detected. Running in secure context.";
+ LOG.info(message);
+ System.out.println(message);
+
try {
return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
@Override
@@ -1085,16 +824,21 @@ public class CliFrontend {
}
}
return run(params);
- } else if (action.equals(ACTION_LIST)) {
+ }
+ else if (action.equals(ACTION_LIST)) {
return list(params);
- } else if (action.equals(ACTION_INFO)) {
+ }
+ else if (action.equals(ACTION_INFO)) {
return info(params);
- } else if (action.equals(ACTION_CANCEL)) {
+ }
+ else if (action.equals(ACTION_CANCEL)) {
return cancel(params);
- } else if (action.equals("-h") || action.equals("--help")) {
- printHelp();
+ }
+ else if (action.equals("-h") || action.equals("--help")) {
+ CliFrontendParser.printHelp();
return 0;
- } else {
+ }
+ else {
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\".");
@@ -1104,15 +848,92 @@ public class CliFrontend {
}
}
+ public void shutdown() {
+ ActorSystem sys = this.actorSystem;
+ if (sys != null) {
+ this.actorSystem = null;
+ sys.shutdown();
+ }
+ }
/**
* Submits the job based on the arguments
*/
- public static void main(String[] args) throws ParseException {
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
+ EnvironmentInformation.checkJavaVersion();
+
+ try {
+ CliFrontend cli = new CliFrontend();
+ int retCode = cli.parseParameters(args);
+ System.exit(retCode);
+ }
+ catch (Throwable t) {
+ LOG.error("Fatal error while running command line interface.", t);
+ t.printStackTrace();
+ System.exit(31);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Miscellaneous Utilities
+ // --------------------------------------------------------------------------------------------
+
+ private static InetSocketAddress parseJobManagerAddress(String hostAndPort) {
+ URI uri;
+ try {
+ uri = new URI("my://" + hostAndPort);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Malformed address " + hostAndPort, e);
+ }
+ String host = uri.getHost();
+ int port = uri.getPort();
+ if (host == null || port == -1) {
+ throw new RuntimeException("Address is missing hostname or port " + hostAndPort);
+ }
+ return new InetSocketAddress(host, port);
+ }
- CliFrontend cli = new CliFrontend();
- int retCode = cli.parseParameters(args);
- System.exit(retCode);
+ public static String getConfigurationDirectoryFromEnv() {
+ String location = System.getenv(ENV_CONFIG_DIRECTORY);
+
+ if (location != null) {
+ if (new File(location).exists()) {
+ return location;
+ }
+ else {
+ throw new RuntimeException("The config directory '" + location + "', specified in the '" +
+ ENV_CONFIG_DIRECTORY + "' environment variable, does not exist.");
+ }
+ }
+ else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
+ location = CONFIG_DIRECTORY_FALLBACK_1;
+ }
+ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
+ location = CONFIG_DIRECTORY_FALLBACK_2;
+ }
+ else {
+ throw new RuntimeException("The configuration directory was not specified. " +
+ "Please specify the directory containing the configuration file through the '" +
+ ENV_CONFIG_DIRECTORY + "' environment variable.");
+ }
+ return location;
}
+ public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
+ List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
+ if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
+ String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+ for(String propLine : propertyLines) {
+ if(propLine == null) {
+ continue;
+ }
+ String[] kv = propLine.split("=");
+ if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
+ ret.add(new Tuple2<String, String>(kv[0], kv[1]));
+ }
+ }
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
new file mode 100644
index 0000000..22e9ece
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the CANCEL command
+ */
+public class CancelOptions extends CommandLineOptions {
+
+ private final String[] args;
+
+ public CancelOptions(CommandLine line) {
+ super(line);
+ this.args = line.getArgs();
+ }
+
+ public String[] getArgs() {
+ return args == null ? new String[0] : args;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
new file mode 100644
index 0000000..932c66d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+/**
+ * Special exception that is thrown when the command line parsing fails.
+ */
+public class CliArgsException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public CliArgsException(String message) {
+ super(message);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
new file mode 100644
index 0000000..0f6ad24
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+
+/**
+ * A simple command line parser (based on Apache Commons CLI) that extracts command
+ * line options.
+ */
+public class CliFrontendParser {
+
+ /** command line interface of the YARN session, with a special initialization here
+ * to prefix all options with y/yarn. */
+ private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn");
+
+
+ static final Option HELP_OPTION = new Option("h", "help", false,
+ "Show the help message for the CLI Frontend or the action.");
+
+ static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
+
+ static final Option CLASS_OPTION = new Option("c", "class", true,
+ "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
+ "JAR file does not specify the class in its manifest.");
+
+ static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
+ "The parallelism with which to run the program. Optional flag to override the default value " +
+ "specified in the configuration.");
+
+ static final Option ARGS_OPTION = new Option("a", "arguments", true,
+ "Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
+
+ static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
+ "Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER
+ + "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
+ "different JobManager than the one specified in the configuration.");
+
+ // list specific options
+ static final Option RUNNING_OPTION = new Option("r", "running", false,
+ "Show only running programs and their JobIDs");
+
+ static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false,
+ "Show only scheduled programs and their JobIDs");
+
+ static {
+ HELP_OPTION.setRequired(false);
+
+ JAR_OPTION.setRequired(false);
+ JAR_OPTION.setArgName("jarfile");
+
+ CLASS_OPTION.setRequired(false);
+ CLASS_OPTION.setArgName("classname");
+
+ ADDRESS_OPTION.setRequired(false);
+ ADDRESS_OPTION.setArgName("host:port");
+
+ PARALLELISM_OPTION.setRequired(false);
+ PARALLELISM_OPTION.setArgName("parallelism");
+
+ ARGS_OPTION.setRequired(false);
+ ARGS_OPTION.setArgName("programArgs");
+ ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
+
+ RUNNING_OPTION.setRequired(false);
+ SCHEDULED_OPTION.setRequired(false);
+ }
+
+ private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options()));
+ private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options()));
+ private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options()));
+ private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options()));
+
+
+ private static Options buildGeneralOptions(Options options) {
+ options.addOption(HELP_OPTION);
+ // backwards compatibility: ignore verbose flag (-v)
+ options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
+ return options;
+ }
+
+ public static Options getProgramSpecificOptions(Options options) {
+ options.addOption(JAR_OPTION);
+ options.addOption(CLASS_OPTION);
+ options.addOption(PARALLELISM_OPTION);
+ options.addOption(ARGS_OPTION);
+
+ // also add the YARN options so that the parser can parse them
+ yarnSessionCLi.getYARNSessionCLIOptions(options);
+ return options;
+ }
+
+ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
+ options.addOption(CLASS_OPTION);
+ options.addOption(PARALLELISM_OPTION);
+ return options;
+ }
+
+ private static Options getRunOptions(Options options) {
+ Options o = getProgramSpecificOptions(options);
+ return getJobManagerAddressOption(o);
+ }
+
+ private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
+ Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+ return getJobManagerAddressOption(o);
+ }
+
+ private static Options getJobManagerAddressOption(Options options) {
+ options.addOption(ADDRESS_OPTION);
+ return options;
+ }
+
+ private static Options getInfoOptions(Options options) {
+ options = getProgramSpecificOptions(options);
+ options = getJobManagerAddressOption(options);
+ return options;
+ }
+
+ private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
+ options = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+ options = getJobManagerAddressOption(options);
+ return options;
+ }
+
+ private static Options getListOptions(Options options) {
+ options.addOption(RUNNING_OPTION);
+ options.addOption(SCHEDULED_OPTION);
+ options = getJobManagerAddressOption(options);
+ return options;
+ }
+
+ private static Options getCancelOptions(Options options) {
+ options = getJobManagerAddressOption(options);
+ return options;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Help
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Prints the help for the client.
+ */
+ public static void printHelp() {
+ System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
+ System.out.println();
+ System.out.println("The following actions are available:");
+
+ printHelpForRun();
+ printHelpForInfo();
+ printHelpForList();
+ printHelpForCancel();
+
+ System.out.println();
+ }
+
+ public static void printHelpForRun() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setLeftPadding(5);
+ formatter.setWidth(80);
+
+ System.out.println("\nAction \"run\" compiles and runs a program.");
+ System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>");
+ formatter.setSyntaxPrefix(" \"run\" action options:");
+ formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
+ formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
+ Options yarnOpts = new Options();
+ yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+ formatter.printHelp(" ", yarnOpts);
+ System.out.println();
+ }
+
+ public static void printHelpForInfo() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setLeftPadding(5);
+ formatter.setWidth(80);
+
+ System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");
+ System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
+ formatter.setSyntaxPrefix(" \"info\" action options:");
+ formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
+ System.out.println();
+ }
+
+ public static void printHelpForList() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setLeftPadding(5);
+ formatter.setWidth(80);
+
+ System.out.println("\nAction \"list\" lists running and scheduled programs.");
+ System.out.println("\n Syntax: list [OPTIONS]");
+ formatter.setSyntaxPrefix(" \"list\" action options:");
+ formatter.printHelp(" ", getListOptions(new Options()));
+ System.out.println();
+ }
+
+ public static void printHelpForCancel() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setLeftPadding(5);
+ formatter.setWidth(80);
+
+ System.out.println("\nAction \"cancel\" cancels a running program.");
+ System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
+ formatter.setSyntaxPrefix(" \"cancel\" action options:");
+ formatter.printHelp(" ", getCancelOptions(new Options()));
+ System.out.println();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Line Parsing
+ // --------------------------------------------------------------------------------------------
+
+ public static RunOptions parseRunCommand(String[] args) throws CliArgsException {
+ try {
+ PosixParser parser = new PosixParser();
+ CommandLine line = parser.parse(RUN_OPTIONS, args, true);
+ return new RunOptions(line);
+ }
+ catch (ParseException e) {
+ throw new CliArgsException(e.getMessage());
+ }
+ }
+
+ public static ListOptions parseListCommand(String[] args) throws CliArgsException {
+ try {
+ PosixParser parser = new PosixParser();
+ CommandLine line = parser.parse(LIST_OPTIONS, args, false);
+ return new ListOptions(line);
+ }
+ catch (ParseException e) {
+ throw new CliArgsException(e.getMessage());
+ }
+ }
+
+ public static CancelOptions parseCancelCommand(String[] args) throws CliArgsException {
+ try {
+ PosixParser parser = new PosixParser();
+ CommandLine line = parser.parse(CANCEL_OPTIONS, args, false);
+ return new CancelOptions(line);
+ }
+ catch (ParseException e) {
+ throw new CliArgsException(e.getMessage());
+ }
+ }
+
+ public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException {
+ try {
+ PosixParser parser = new PosixParser();
+ CommandLine line = parser.parse(INFO_OPTIONS, args, false);
+ return new InfoOptions(line);
+ }
+ catch (ParseException e) {
+ throw new CliArgsException(e.getMessage());
+ }
+ }
+
+ public static FlinkYarnSessionCli getFlinkYarnSessionCli() {
+ return yarnSessionCLi;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
new file mode 100644
index 0000000..f6f6319
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
+/**
+ * Base class for all options parsed from the command line.
+ * Contains options for printing help and the JobManager address.
+ */
+public abstract class CommandLineOptions {
+
+ private final CommandLine commandLine;
+
+ private final String jobManagerAddress;
+
+ private final boolean printHelp;
+
+
+ protected CommandLineOptions(CommandLine line) {
+ this.commandLine = line;
+ this.printHelp = line.hasOption(HELP_OPTION.getOpt());
+ this.jobManagerAddress = line.hasOption(ADDRESS_OPTION.getOpt()) ?
+ line.getOptionValue(ADDRESS_OPTION.getOpt()) : null;
+ }
+
+ public CommandLine getCommandLine() {
+ return commandLine;
+ }
+
+ public boolean isPrintHelp() {
+ return printHelp;
+ }
+
+ public String getJobManagerAddress() {
+ return jobManagerAddress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
new file mode 100644
index 0000000..83f5c38
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the INFO command
+ */
+public class InfoOptions extends ProgramOptions {
+
+ public InfoOptions(CommandLine line) throws CliArgsException {
+ super(line);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
new file mode 100644
index 0000000..45f39a4
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION;
+
+/**
+ * Command line options for the LIST command
+ */
+public class ListOptions extends CommandLineOptions {
+
+ private final boolean running;
+ private final boolean scheduled;
+
+ public ListOptions(CommandLine line) {
+ super(line);
+ this.running = line.hasOption(RUNNING_OPTION.getOpt());
+ this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+ }
+
+ public boolean getRunning() {
+ return running;
+ }
+
+ public boolean getScheduled() {
+ return scheduled;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
new file mode 100644
index 0000000..5b24a41
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.util.Arrays;
+
+import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+
+/**
+ * Base class for command line options that refer to a JAR file program.
+ */
+public abstract class ProgramOptions extends CommandLineOptions {
+
+ private final String jarFilePath;
+
+ private final String entryPointClass;
+
+ private final String[] programArgs;
+
+ private final int parallelism;
+
+ protected ProgramOptions(CommandLine line) throws CliArgsException {
+ super(line);
+
+ String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
+ line.getOptionValues(ARGS_OPTION.getOpt()) :
+ line.getArgs();
+
+ if (line.hasOption(JAR_OPTION.getOpt())) {
+ this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
+ }
+ else if (args.length > 0) {
+ jarFilePath = args[0];
+ args = Arrays.copyOfRange(args, 1, args.length);
+ }
+ else {
+ jarFilePath = null;
+ }
+
+ this.programArgs = args;
+
+ this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
+ line.getOptionValue(CLASS_OPTION.getOpt()) : null;
+
+ if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
+ String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
+ try {
+ parallelism = Integer.parseInt(parString);
+ if (parallelism <= 0) {
+ throw new NumberFormatException();
+ }
+ }
+ catch (NumberFormatException e) {
+ throw new CliArgsException("The parallelism must be a positive number: " + parString);
+ }
+ }
+ else {
+ parallelism = -1;
+ }
+ }
+
+ public String getJarFilePath() {
+ return jarFilePath;
+ }
+
+ public String getEntryPointClassName() {
+ return entryPointClass;
+ }
+
+ public String[] getProgramArgs() {
+ return programArgs;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
new file mode 100644
index 0000000..2e4eb31
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the RUN command.
+ */
+public class RunOptions extends ProgramOptions {
+
+ public RunOptions(CommandLine line) throws CliArgsException {
+ super(line);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 5a032a0..f4a2dc9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -25,7 +25,6 @@ import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
-import akka.remote.AssociationErrorEvent;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
@@ -219,8 +218,7 @@ public class Client {
}
private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
- JobGraph job = null;
-
+ JobGraph job;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
} else {
@@ -356,21 +354,6 @@ public class Client {
return new JobExecutionResult(-1, null);
}
- private Throwable getAssociationError(List<AssociationErrorEvent> eventLog) {
- int len = eventLog.size();
- if (len > 0) {
- AssociationErrorEvent e = eventLog.get(len - 1);
- Throwable cause = e.getCause();
- if (cause instanceof akka.remote.InvalidAssociation) {
- return cause.getCause();
- } else {
- return cause;
- }
- } else {
- return null;
- }
- }
-
// --------------------------------------------------------------------------------------------
public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {