You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:23 UTC
[09/10] flink git commit: [FLINK-3937] programmatic resuming of
clusters
[FLINK-3937] programmatic resuming of clusters
- integrates with and extends the refactoring of FLINK-3667
- enables to resume from Yarn properties or Yarn application id
- introduces additional StandaloneClusterDescriptor
- introduces DefaultCLI to get rid of standalone mode switches in CliFrontend
- various fixes and improvements
- remove legacy code from CliFrontend
- change activation code of CustomCommandLine interface
- use checked exceptions to signal supported operations
- remove all checked exceptions of type Exception
- fix logging and reduce verbosity of per-job clusters
- print 'id' argument in YarnSessionCli
- minor renaming of methods names
- improve documentation
- deprecate streaming option
- extend CliFrontendYarnAddressConfigurationTest
- move loading of custom CLIs to CliFrontend
This closes #2085
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4ac8522
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4ac8522
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4ac8522
Branch: refs/heads/master
Commit: f4ac852275da36ee33aa54ae9097293ccc981afa
Parents: 875d4d2
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Apr 25 16:28:51 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jun 17 10:37:58 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 217 +++++------
.../flink/client/cli/CliFrontendParser.java | 225 ++++++------
.../flink/client/cli/CustomCommandLine.java | 34 +-
.../org/apache/flink/client/cli/DefaultCLI.java | 77 ++++
.../client/deployment/ClusterDescriptor.java | 14 +-
.../deployment/StandaloneClusterDescriptor.java | 56 +++
.../flink/client/program/ClusterClient.java | 16 +-
.../CliFrontendAddressConfigurationTest.java | 59 +--
.../org/apache/flink/api/scala/FlinkShell.scala | 24 +-
...CliFrontendYarnAddressConfigurationTest.java | 360 +++++++++++++++----
.../flink/yarn/FlinkYarnSessionCliTest.java | 10 +-
.../yarn/AbstractYarnClusterDescriptor.java | 220 ++++++++----
.../apache/flink/yarn/YarnClusterClient.java | 151 ++++----
.../flink/yarn/YarnClusterDescriptor.java | 2 +
.../flink/yarn/cli/FlinkYarnSessionCli.java | 226 ++++++------
15 files changed, 1062 insertions(+), 629 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3064f8d..a01ab53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -18,8 +18,7 @@
package org.apache.flink.client;
-import akka.actor.ActorSystem;
-
+import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -30,6 +29,7 @@ import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
@@ -39,7 +39,6 @@ import org.apache.flink.client.cli.StopOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -56,7 +55,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
@@ -67,13 +65,12 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -81,6 +78,8 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.text.SimpleDateFormat;
@@ -89,6 +88,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -102,9 +102,11 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepo
*/
public class CliFrontend {
+ private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
+
// actions
- public static final String ACTION_RUN = "run";
- public static final String ACTION_INFO = "info";
+ private static final String ACTION_RUN = "run";
+ private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
@@ -116,19 +118,24 @@ public class CliFrontend {
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
// --------------------------------------------------------------------------------------------
+
+ private static final List<CustomCommandLine> customCommandLine = new LinkedList<>();
+
+ static {
+ /** command line interface of the YARN session, with a special initialization here
+ * to prefix all options with y/yarn. */
+ loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
+ customCommandLine.add(new DefaultCLI());
+ }
+
// --------------------------------------------------------------------------------------------
- private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
private final Configuration config;
private final FiniteDuration clientTimeout;
- private final FiniteDuration lookupTimeout;
-
- private ActorSystem actorSystem;
-
/**
*
* @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
@@ -146,6 +153,8 @@ public class CliFrontend {
// load the configuration
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+ System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
+
this.config = GlobalConfiguration.getConfiguration();
try {
@@ -156,7 +165,6 @@ public class CliFrontend {
}
this.clientTimeout = AkkaUtils.getClientTimeout(config);
- this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
}
@@ -798,19 +806,20 @@ public class CliFrontend {
*
* @param options Command line options
*/
- protected void updateConfig(CommandLineOptions options) {
- if(options.getJobManagerAddress() != null){
- if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
- jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli()
- .attachFlinkYarnClient(options.getCommandLine())
- .getJobManagerAddress();
- InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
- writeJobManagerAddressToConfig(config, jobManagerAddress);
+ protected ClusterClient retrieveClient(CommandLineOptions options) {
+ CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
+ try {
+ ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
+ LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
+ return client;
+ } catch (Exception e) {
+ LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
+ throw new IllegalConfigurationException("Couldn't retrieve client for cluster", e);
}
}
/**
- * Retrieves the {@link ActorGateway} for the JobManager. The JobManager address is retrieved
+ * Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved
* from the provided {@link CommandLineOptions}.
*
* @param options CommandLineOptions specifying the JobManager URL
@@ -818,92 +827,41 @@ public class CliFrontend {
* @throws Exception
*/
protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
- // overwrite config values with given command line options
- updateConfig(options);
-
- // start an actor system if needed
- if (this.actorSystem == null) {
- LOG.info("Starting actor system to communicate with JobManager");
- try {
- scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
- this.actorSystem = AkkaUtils.createActorSystem(
- config,
- new Some<scala.Tuple2<String, Object>>(systemEndpoint));
- }
- catch (Exception e) {
- throw new IOException("Could not start actor system to communicate with JobManager", e);
- }
-
- LOG.info("Actor system successfully started");
- }
-
- LOG.info("Trying to lookup the JobManager gateway");
- // Retrieve the ActorGateway from the LeaderRetrievalService
- LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-
- return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout);
+ return retrieveClient(options).getJobManagerGateway();
}
/**
- * Retrieves a {@link ClusterClient} object from the given command line options and other parameters.
- *
- * @param options Command line options which contain JobManager address
+ * Creates a {@link ClusterClient} object from the given command line options and other parameters.
+ * @param options Command line options
* @param programName Program name
* @throws Exception
*/
protected ClusterClient getClient(
CommandLineOptions options,
- String programName)
- throws Exception {
- InetSocketAddress jobManagerAddress;
-
- // try to get the JobManager address via command-line args
- if (options.getJobManagerAddress() != null) {
+ String programName) throws Exception {
- // Get the custom command-lines (e.g. Yarn/Mesos)
- CustomCommandLine<?> activeCommandLine =
- CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
+ // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+ CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
- if (activeCommandLine != null) {
- logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console");
-
- // Default yarn application name to use, if nothing is specified on the command line
+ ClusterClient client;
+ try {
+ client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
+ logAndSysout("Cluster retrieved");
+ } catch (UnsupportedOperationException e) {
+ try {
String applicationName = "Flink Application: " + programName;
-
- ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine());
-
+ client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
logAndSysout("Cluster started");
- logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
-
- return client;
- } else {
- // job manager address supplied on the command-line
- LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress());
- jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
- writeJobManagerAddressToConfig(config, jobManagerAddress);
- return new StandaloneClusterClient(config);
- }
-
- // try to get the JobManager address via resuming of a cluster
- } else {
- for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) {
- ClusterClient client = cli.retrieveCluster(config);
- if (client != null) {
- LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
- return client;
- }
+ } catch (UnsupportedOperationException e2) {
+ throw new IllegalConfigurationException(
+ "The JobManager address is neither provided at the command-line, " +
+ "nor configured in flink-conf.yaml.");
}
}
- // read JobManager address from the config
- if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
- return new StandaloneClusterClient(config);
- // We tried hard but couldn't find a JobManager address
- } else {
- throw new IllegalConfigurationException(
- "The JobManager address is neither provided at the command-line, " +
- "nor configured in flink-conf.yaml.");
- }
+ logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
+ logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
+ return client;
}
// --------------------------------------------------------------------------------------------
@@ -917,7 +875,7 @@ public class CliFrontend {
* @return The return code for the process.
*/
private int handleArgException(Exception e) {
- LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
+ LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage()));
System.out.println(e.getMessage());
System.out.println();
@@ -1039,14 +997,6 @@ public class CliFrontend {
}
}
- public void shutdown() {
- ActorSystem sys = this.actorSystem;
- if (sys != null) {
- this.actorSystem = null;
- sys.shutdown();
- }
- }
-
/**
* Submits the job based on the arguments
*/
@@ -1070,7 +1020,8 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
public static String getConfigurationDirectoryFromEnv() {
- String location = System.getenv(ENV_CONFIG_DIRECTORY);
+ String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
+ String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY);
if (location != null) {
if (new File(location).exists()) {
@@ -1102,9 +1053,65 @@ public class CliFrontend {
* @param address Address to write to the configuration
* @param config The config to write to
*/
- public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) {
+ public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
}
+ // --------------------------------------------------------------------------------------------
+ // Custom command-line
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the custom command-line for the arguments.
+ * @param commandLine The input to the command-line.
+ * @return custom command-line which is active (may only be one at a time)
+ */
+ public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+ for (CustomCommandLine cli : customCommandLine) {
+ if (cli.isActive(commandLine, config)) {
+ return cli;
+ }
+ }
+ throw new IllegalStateException("No command-line ran.");
+ }
+
+ /**
+ * Retrieves the loaded custom command-lines.
+ * @return An unmodifiyable list of loaded custom command-lines.
+ */
+ public static List<CustomCommandLine> getCustomCommandLineList() {
+ return Collections.unmodifiableList(customCommandLine);
+ }
+
+ /**
+ * Loads a class from the classpath that implements the CustomCommandLine interface.
+ * @param className The fully-qualified class name to load.
+ * @param params The constructor parameters
+ */
+ private static void loadCustomCommandLine(String className, Object... params) {
+
+ try {
+ Class<? extends CustomCommandLine> customCliClass =
+ Class.forName(className).asSubclass(CustomCommandLine.class);
+
+ // construct class types from the parameters
+ Class<?>[] types = new Class<?>[params.length];
+ for (int i = 0; i < params.length; i++) {
+ Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+ types[i] = params[i].getClass();
+ }
+
+ Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
+ final CustomCommandLine cli = constructor.newInstance(params);
+
+ customCommandLine.add(cli);
+
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
+ | InvocationTargetException e) {
+ LOG.warn("Unable to locate custom CLI class {}. " +
+ "Flink is not compiled with support for this class.", className, e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 9b935e8..c90793d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,16 +24,10 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.client.CliFrontend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* A simple command line parser (based on Apache Commons CLI) that extracts command
@@ -44,16 +38,6 @@ public class CliFrontendParser {
private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class);
- /** command line interface of the YARN session, with a special initialization here
- * to prefix all options with y/yarn. */
- private static final Map<String, CustomCommandLine> customCommandLine = new HashMap<>(1);
-
- static {
- // we could easily add more here in the future
- loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
- }
-
-
static final Option HELP_OPTION = new Option("h", "help", false,
"Show the help message for the CLI Frontend or the action.");
@@ -82,9 +66,8 @@ public class CliFrontendParser {
static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
- static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
+ public static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
"Address of the JobManager (master) to which to connect. " +
- "Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " +
"Use this flag to connect to a different JobManager than the one specified in the configuration.");
static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
@@ -146,6 +129,10 @@ public class CliFrontendParser {
options.addOption(HELP_OPTION);
// backwards compatibility: ignore verbose flag (-v)
options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
+ // add general options of all CLIs
+ for (CustomCommandLine customCLI : CliFrontend.getCustomCommandLineList()) {
+ customCLI.addGeneralOptions(options);
+ }
return options;
}
@@ -158,11 +145,6 @@ public class CliFrontendParser {
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);
-
- for (CustomCommandLine customCLI : customCommandLine.values()) {
- customCLI.addOptions(options);
- }
-
return options;
}
@@ -177,62 +159,85 @@ public class CliFrontendParser {
}
private static Options getRunOptions(Options options) {
- Options o = getProgramSpecificOptions(options);
- return getJobManagerAddressOption(o);
+ options = getProgramSpecificOptions(options);
+ options = getJobManagerAddressOption(options);
+ return addCustomCliOptions(options, true);
}
- private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
- Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
- return getJobManagerAddressOption(o);
- }
private static Options getJobManagerAddressOption(Options options) {
options.addOption(ADDRESS_OPTION);
- yarnSessionCLi.getYARNAttachCLIOptions(options);
-
return options;
}
private static Options getInfoOptions(Options options) {
options = getProgramSpecificOptions(options);
options = getJobManagerAddressOption(options);
- return options;
+ return addCustomCliOptions(options, false);
+ }
+
+ private static Options getListOptions(Options options) {
+ options.addOption(RUNNING_OPTION);
+ options.addOption(SCHEDULED_OPTION);
+ options = getJobManagerAddressOption(options);
+ return addCustomCliOptions(options, false);
+ }
+
+ private static Options getCancelOptions(Options options) {
+ options = getJobManagerAddressOption(options);
+ return addCustomCliOptions(options, false);
+ }
+
+ private static Options getStopOptions(Options options) {
+ options = getJobManagerAddressOption(options);
+ return addCustomCliOptions(options, false);
+ }
+
+ private static Options getSavepointOptions(Options options) {
+ options = getJobManagerAddressOption(options);
+ options.addOption(SAVEPOINT_DISPOSE_OPTION);
+ return addCustomCliOptions(options, false);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Help
+ // --------------------------------------------------------------------------------------------
+
+ private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
+ Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+ return getJobManagerAddressOption(o);
}
+
private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);
- options = getJobManagerAddressOption(options);
return options;
}
- private static Options getListOptions(Options options) {
+ private static Options getListOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(RUNNING_OPTION);
options.addOption(SCHEDULED_OPTION);
options = getJobManagerAddressOption(options);
return options;
}
- private static Options getCancelOptions(Options options) {
+ private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
return options;
}
- private static Options getStopOptions(Options options) {
+ private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
return options;
}
- private static Options getSavepointOptions(Options options) {
+ private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
return options;
}
- // --------------------------------------------------------------------------------------------
- // Help
- // --------------------------------------------------------------------------------------------
-
/**
* Prints the help for the client.
*/
@@ -261,14 +266,7 @@ public class CliFrontendParser {
formatter.setSyntaxPrefix(" \"run\" action options:");
formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
- // prints options from all available command-line classes
- for (Map.Entry<String, CustomCommandLine> entry: customCommandLine.entrySet()) {
- formatter.setSyntaxPrefix(" Additional arguments if -m " + entry.getKey() + " is set:");
- Options customOpts = new Options();
- entry.getValue().addOptions(customOpts);
- formatter.printHelp(" ", customOpts);
- System.out.println();
- }
+ printCustomCliOptions(formatter, true);
System.out.println();
}
@@ -282,10 +280,9 @@ public class CliFrontendParser {
System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"info\" action options:");
formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
- formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
- Options yarnOpts = new Options();
- yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
- formatter.printHelp(" ", yarnOpts);
+
+ printCustomCliOptions(formatter, false);
+
System.out.println();
}
@@ -297,7 +294,10 @@ public class CliFrontendParser {
System.out.println("\nAction \"list\" lists running and scheduled programs.");
System.out.println("\n Syntax: list [OPTIONS]");
formatter.setSyntaxPrefix(" \"list\" action options:");
- formatter.printHelp(" ", getListOptions(new Options()));
+ formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));
+
+ printCustomCliOptions(formatter, false);
+
System.out.println();
}
@@ -309,7 +309,10 @@ public class CliFrontendParser {
System.out.println("\nAction \"stop\" stops a running program (streaming jobs only).");
System.out.println("\n Syntax: stop [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"stop\" action options:");
- formatter.printHelp(" ", getStopOptions(new Options()));
+ formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));
+
+ printCustomCliOptions(formatter, false);
+
System.out.println();
}
@@ -321,11 +324,10 @@ public class CliFrontendParser {
System.out.println("\nAction \"cancel\" cancels a running program.");
System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"cancel\" action options:");
- formatter.printHelp(" ", getCancelOptions(new Options()));
- formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
- Options yarnOpts = new Options();
- yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
- formatter.printHelp(" ", yarnOpts);
+ formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));
+
+ printCustomCliOptions(formatter, false);
+
System.out.println();
}
@@ -337,10 +339,50 @@ public class CliFrontendParser {
System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"savepoint\" action options:");
- formatter.printHelp(" ", getSavepointOptions(new Options()));
+ formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));
+
+ printCustomCliOptions(formatter, false);
+
System.out.println();
}
+ /**
+ * Adds custom cli options
+ * @param options The options to add options to
+ * @param runOptions Whether to include run options
+ * @return Options with additions
+ */
+ private static Options addCustomCliOptions(Options options, boolean runOptions) {
+ for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) {
+ cli.addGeneralOptions(options);
+ if (runOptions) {
+ cli.addRunOptions(options);
+ }
+ }
+ return options;
+ }
+
+ /**
+ * Prints custom cli options
+ * @param formatter The formatter to use for printing
+ * @param runOptions True if the run options should be printed, False to print only general options
+ */
+ private static void printCustomCliOptions(HelpFormatter formatter, boolean runOptions) {
+ // prints options from all available command-line classes
+ for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) {
+ if (cli.getId() != null) {
+ formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:");
+ Options customOpts = new Options();
+ cli.addGeneralOptions(customOpts);
+ if (runOptions) {
+ cli.addRunOptions(customOpts);
+ }
+ formatter.printHelp(" ", customOpts);
+ System.out.println();
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Line Parsing
// --------------------------------------------------------------------------------------------
@@ -410,63 +452,4 @@ public class CliFrontendParser {
}
}
- public static Map<String, CustomCommandLine> getAllCustomCommandLine() {
- if (customCommandLine.isEmpty()) {
- LOG.warn("No custom command-line classes were loaded.");
- }
- return Collections.unmodifiableMap(customCommandLine);
- }
-
- private static String getCliIdentifierString() {
- StringBuilder builder = new StringBuilder();
- boolean first = true;
- for (String identifier : customCommandLine.keySet()) {
- if (!first) {
- builder.append(", ");
- }
- first = false;
- builder.append("'").append(identifier).append("'");
- }
- return builder.toString();
- }
-
- /**
- * Gets the custom command-line for this identifier.
- * @param identifier The unique identifier for this command-line implementation.
- * @return CustomCommandLine or null if none was found
- */
- public static CustomCommandLine getActiveCustomCommandLine(String identifier) {
- return CliFrontendParser.getAllCustomCommandLine().get(identifier);
- }
-
- private static void loadCustomCommandLine(String className, Object... params) {
-
- try {
- Class<? extends CustomCommandLine> customCliClass =
- Class.forName(className).asSubclass(CustomCommandLine.class);
-
- // construct class types from the parameters
- Class<?>[] types = new Class<?>[params.length];
- for (int i = 0; i < params.length; i++) {
- Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
- types[i] = params[i].getClass();
- }
-
- Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
- final CustomCommandLine cli = constructor.newInstance(params);
-
- String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier());
- CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli);
-
- if (existing != null) {
- throw new IllegalStateException("Attempted to register " + cliIdentifier +
- " but there is already a command-line with this identifier.");
- }
- } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
- | InvocationTargetException e) {
- LOG.warn("Unable to locate custom CLI class {}. " +
- "Flink is not compiled with support for this class.", className, e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index cd5e0e6..aecdc7c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -29,29 +29,47 @@ import org.apache.flink.configuration.Configuration;
public interface CustomCommandLine<ClusterType extends ClusterClient> {
/**
- * Returns a unique identifier for this custom command-line.
- * @return An unique identifier string
+ * Signals whether the custom command-line wants to execute or not
+ * @param commandLine The command-line options
+ * @param configuration The Flink configuration
+ * @return True if the command-line wants to run, False otherwise
*/
- String getIdentifier();
+ boolean isActive(CommandLine commandLine, Configuration configuration);
/**
- * Adds custom options to the existing options.
+ * Gets the unique identifier of this CustomCommandLine
+ * @return A unique identifier
+ */
+ String getId();
+
+ /**
+ * Adds custom options to the existing run options.
+ * @param baseOptions The existing options.
+ */
+ void addRunOptions(Options baseOptions);
+
+ /**
+ * Adds custom options to the existing general options.
* @param baseOptions The existing options.
*/
- void addOptions(Options baseOptions);
+ void addGeneralOptions(Options baseOptions);
/**
* Retrieves a client for a running cluster
+ * @param commandLine The command-line parameters from the CliFrontend
* @param config The Flink config
- * @return Client if a cluster could be retrieve, null otherwise
+ * @return Client if a cluster could be retrieved
+ * @throws UnsupportedOperationException if the operation is not supported
*/
- ClusterClient retrieveCluster(Configuration config) throws Exception;
+ ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
/**
* Creates the client for the cluster
* @param applicationName The application name to use
* @param commandLine The command-line options parsed by the CliFrontend
+ * @param config The Flink config to use
* @return The client to communicate with the cluster which the CustomCommandLine brought up.
+ * @throws UnsupportedOperationException if the operation is not supported
*/
- ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception;
+ ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
new file mode 100644
index 0000000..8bceed7
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * The default CLI which is used for interaction with standalone clusters.
+ */
+public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
+
+ @Override
+ public boolean isActive(CommandLine commandLine, Configuration configuration) {
+ // always active because we can try to read a JobManager address from the config
+ return true;
+ }
+
+ @Override
+ public String getId() {
+ return null;
+ }
+
+ @Override
+ public void addRunOptions(Options baseOptions) {
+ }
+
+ @Override
+ public void addGeneralOptions(Options baseOptions) {
+ }
+
+ @Override
+ public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configuration config) {
+
+ if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
+ String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
+ InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort);
+ setJobManagerAddressInConfig(config, jobManagerAddress);
+ }
+
+ StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
+ return descriptor.retrieve(null);
+ }
+
+ @Override
+ public StandaloneClusterClient createCluster(
+ String applicationName,
+ CommandLine commandLine,
+ Configuration config) throws UnsupportedOperationException {
+
+ StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
+ return descriptor.deploy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index cf0595b..59cece3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -30,12 +30,20 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
* Returns a String containing details about the cluster (NodeManagers, available memory, ...)
*
*/
- String getClusterDescription() throws Exception;
+ String getClusterDescription();
+
+ /**
+ * Retrieves an existing Flink Cluster.
+ * @param applicationID The unique application identifier of the running cluster
+ * @return Client for the cluster
+ * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
+ */
+ ClientType retrieve(String applicationID) throws UnsupportedOperationException;
/**
* Triggers deployment of a cluster
* @return Client for the cluster
- * @throws Exception
+ * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
- ClientType deploy() throws Exception;
+ ClientType deploy() throws UnsupportedOperationException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
new file mode 100644
index 0000000..57ccc47
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * A deployment descriptor for an existing cluster
+ */
+public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
+
+ private final Configuration config;
+
+ public StandaloneClusterDescriptor(Configuration config) {
+ this.config = config;
+ }
+
+ @Override
+ public String getClusterDescription() {
+ String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "");
+ int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+ return "Standalone cluster at " + host + ":" + port;
+ }
+
+ @Override
+ public StandaloneClusterClient retrieve(String applicationID) {
+ try {
+ return new StandaloneClusterClient(config);
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't retrieve standalone cluster", e);
+ }
+ }
+
+ @Override
+ public StandaloneClusterClient deploy() {
+ throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index b56428d..def9578 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -76,7 +76,7 @@ import akka.actor.ActorSystem;
*/
public abstract class ClusterClient {
- private static final Logger LOG = LoggerFactory.getLogger(ClusterClient.class);
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
/** The optimizer used in the optimization of batch programs */
final Optimizer compiler;
@@ -203,9 +203,9 @@ public abstract class ClusterClient {
*/
public InetSocketAddress getJobManagerAddressFromConfig() {
try {
- String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
- return new InetSocketAddress(hostName, port);
+ String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+ int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+ return new InetSocketAddress(hostName, port);
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve JobManager address", e);
}
@@ -255,11 +255,13 @@ public abstract class ClusterClient {
}
public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
+ Logger log = LoggerFactory.getLogger(ClusterClient.class);
+
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
- LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
+ log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
- LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
+ log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
return compiler.compile(p);
}
@@ -603,7 +605,7 @@ public abstract class ClusterClient {
* @return ActorGateway of the current job manager leader
* @throws Exception
*/
- protected ActorGateway getJobManagerGateway() throws Exception {
+ public ActorGateway getJobManagerGateway() throws Exception {
LOG.info("Looking up JobManager");
return LeaderRetrievalUtils.retrieveLeaderGateway(
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index de85ca8..c6b1111 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -23,9 +23,13 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
+import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -57,14 +61,12 @@ public class CliFrontendAddressConfigurationTest {
public void testValidConfig() {
try {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
- CommandLineOptions options = mock(CommandLineOptions.class);
-
- frontend.updateConfig(options);
- Configuration config = frontend.getConfiguration();
+ ClusterClient clusterClient = frontend.retrieveClient(options);
checkJobManagerAddress(
- config,
+ clusterClient.getFlinkConfiguration(),
CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
}
@@ -74,43 +76,12 @@ public class CliFrontendAddressConfigurationTest {
}
}
- @Test
- public void testInvalidConfigAndNoOption() {
- try {
+ @Test(expected = IllegalConfigurationException.class)
+ public void testInvalidConfigAndNoOption() throws Exception {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
- CommandLineOptions options = mock(CommandLineOptions.class);
-
- frontend.updateConfig(options);
- Configuration config = frontend.getConfiguration();
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
- checkJobManagerAddress(config, null, -1);
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testInvalidConfigAndOption() {
- try {
- CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-
- CommandLineOptions options = mock(CommandLineOptions.class);
- when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
-
- frontend.updateConfig(options);
- Configuration config = frontend.getConfiguration();
-
- InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
-
- checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ frontend.retrieveClient(options);
}
@Test
@@ -118,12 +89,10 @@ public class CliFrontendAddressConfigurationTest {
try {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- CommandLineOptions options = mock(CommandLineOptions.class);
- when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
-
- frontend.updateConfig(options);
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
- Configuration config = frontend.getConfiguration();
+ ClusterClient client = frontend.retrieveClient(options);
+ Configuration config = client.getFlinkConfiguration();
InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 1a8870b..f3b3507 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala
import java.io._
+import org.apache.commons.cli.CommandLine
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.client.CliFrontend
@@ -245,11 +246,13 @@ object FlinkShell {
yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString))
yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString))
- val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
val options = CliFrontendParser.parseRunCommand(args.toArray)
+ val frontend = new CliFrontend()
+ val config = frontend.getConfiguration
+ val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
- val cluster = customCLI.createClient("Flink Scala Shell", options.getCommandLine)
+ val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config)
val address = cluster.getJobManagerAddress.getAddress.getHostAddress
val port = cluster.getJobManagerAddress.getPort
@@ -259,12 +262,21 @@ object FlinkShell {
def fetchDeployedYarnClusterInfo() = {
- // load configuration
- val globalConfig = GlobalConfiguration.getConfiguration
- val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
+ val args = ArrayBuffer[String](
+ "-m", "yarn-cluster"
+ )
- val cluster = customCLI.retrieveCluster(globalConfig)
+ val options = CliFrontendParser.parseRunCommand(args.toArray)
+ val frontend = new CliFrontend()
+ val config = frontend.getConfiguration
+ val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
+
+ val cluster = customCLI.retrieveCluster(options.getCommandLine, config)
+
+ if (cluster == null) {
+ throw new RuntimeException("Yarn Cluster could not be retrieved.")
+ }
val jobManager = cluster.getJobManagerAddress
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index c6a1ade..217ad3d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -18,27 +18,45 @@
package org.apache.flink.yarn;
+import org.apache.commons.cli.CommandLine;
import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.junit.*;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import java.io.File;
+import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
+import java.util.LinkedList;
+import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests that verify that the CLI client picks up the correct address for the JobManager
@@ -80,8 +98,10 @@ public class CliFrontendYarnAddressConfigurationTest {
private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
+ private static final ApplicationId TEST_YARN_APPLICATION_ID =
+ ApplicationId.newInstance(System.currentTimeMillis(), 42);
- private static final String propertiesFile =
+ private static final String validPropertiesFile =
"jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT;
@@ -101,110 +121,292 @@ public class CliFrontendYarnAddressConfigurationTest {
* Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
*/
@Test
- public void testYarnConfig() {
- try {
- File tmpFolder = temporaryFolder.newFolder();
- String currentUser = System.getProperty("user.name");
+ public void testResumeFromYarnPropertiesFile() throws Exception {
- // copy .yarn-properties-<username>
- File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
- Files.write(testPropertiesFile.toPath(), propertiesFile.getBytes(), StandardOpenOption.CREATE);
+ File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
- // copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
- String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
- File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
- Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
+ // start CLI Frontend
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
- // start CLI Frontend
- TestCLI frontend = new TestCLI(tmpFolder.getAbsolutePath());
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
- CommandLineOptions options = mock(CommandLineOptions.class);
+ frontend.retrieveClient(options);
+ checkJobManagerAddress(
+ frontend.getConfiguration(),
+ TEST_YARN_JOB_MANAGER_ADDRESS,
+ TEST_YARN_JOB_MANAGER_PORT);
- frontend.getClient(options, "Program name");
+ }
- frontend.updateConfig(options);
- Configuration config = frontend.getConfiguration();
+ @Test(expected = IllegalConfigurationException.class)
+ public void testResumeFromYarnPropertiesFileWithFinishedApplication() throws Exception {
- checkJobManagerAddress(
- config,
- TEST_YARN_JOB_MANAGER_ADDRESS,
- TEST_YARN_JOB_MANAGER_PORT);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
+ // start CLI Frontend
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
+
+ frontend.retrieveClient(options);
+ checkJobManagerAddress(
+ frontend.getConfiguration(),
+ TEST_YARN_JOB_MANAGER_ADDRESS,
+ TEST_YARN_JOB_MANAGER_PORT);
}
- public static class TestCLI extends CliFrontend {
- TestCLI(String configDir) throws Exception {
- super(configDir);
- }
- @Override
- public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
- return super.getClient(options, programName);
- }
+ @Test(expected = IllegalConfigurationException.class)
+ public void testInvalidYarnPropertiesFile() throws Exception {
- @Override
- public void updateConfig(CommandLineOptions options) {
- super.updateConfig(options);
- }
+ File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
+
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
+
+ frontend.retrieveClient(options);
+ Configuration config = frontend.getConfiguration();
+
+ checkJobManagerAddress(
+ config,
+ TEST_JOB_MANAGER_ADDRESS,
+ TEST_JOB_MANAGER_PORT);
}
+
@Test
- public void testInvalidYarnConfig() {
- try {
- File tmpFolder = temporaryFolder.newFolder();
+ public void testResumeFromYarnID() throws Exception {
+ File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
- // copy invalid .yarn-properties-<username>
- File testPropertiesFile = new File(tmpFolder, ".yarn-properties");
- Files.write(testPropertiesFile.toPath(), invalidPropertiesFile.getBytes(), StandardOpenOption.CREATE);
+ // start CLI Frontend
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
- // copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
- String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
- File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
- Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
+ RunOptions options =
+ CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
- TestCLI cli = new TestCLI(tmpFolder.getAbsolutePath());
+ frontend.retrieveClient(options);
- CommandLineOptions options = mock(CommandLineOptions.class);
+ checkJobManagerAddress(
+ frontend.getConfiguration(),
+ TEST_YARN_JOB_MANAGER_ADDRESS,
+ TEST_YARN_JOB_MANAGER_PORT);
+ }
- cli.updateConfig(options);
+ @Test(expected = IllegalConfigurationException.class)
+ public void testResumeFromInvalidYarnID() throws Exception {
+ File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
- Configuration config = cli.getConfiguration();
+ // start CLI Frontend
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
- checkJobManagerAddress(
- config,
- TEST_JOB_MANAGER_ADDRESS,
- TEST_JOB_MANAGER_PORT);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ RunOptions options =
+ CliFrontendParser.parseRunCommand(new String[] {"-yid", ApplicationId.newInstance(0, 666).toString()});
+
+ frontend.retrieveClient(options);
+ checkJobManagerAddress(
+ frontend.getConfiguration(),
+ TEST_YARN_JOB_MANAGER_ADDRESS,
+ TEST_YARN_JOB_MANAGER_PORT);
+ }
+
+ @Test(expected = IllegalConfigurationException.class)
+ public void testResumeFromYarnIDWithFinishedApplication() throws Exception {
+ File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
+ // start CLI Frontend
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+
+ RunOptions options =
+ CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+
+ frontend.retrieveClient(options);
+
+ checkJobManagerAddress(
+ frontend.getConfiguration(),
+ TEST_YARN_JOB_MANAGER_ADDRESS,
+ TEST_YARN_JOB_MANAGER_PORT);
}
@Test
- public void testManualOptionsOverridesYarn() {
- try {
- File emptyFolder = temporaryFolder.newFolder();
- TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
+ public void testYarnIDOverridesPropertiesFile() throws Exception {
+ File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
+
+ // start CLI Frontend
+ TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+
+ RunOptions options =
+ CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+
+ frontend.retrieveClient(options);
+
+ checkJobManagerAddress(
+ frontend.getConfiguration(),
+ TEST_YARN_JOB_MANAGER_ADDRESS,
+ TEST_YARN_JOB_MANAGER_PORT);
+ }
+
+
+ @Test
+ public void testManualOptionsOverridesYarn() throws Exception {
+
+ File emptyFolder = temporaryFolder.newFolder();
+ File testConfFile = new File(emptyFolder.getAbsolutePath(), "flink-conf.yaml");
+ Files.createFile(testConfFile.toPath());
- CommandLineOptions options = mock(CommandLineOptions.class);
- when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
+ TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
- frontend.updateConfig(options);
+ RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
- Configuration config = frontend.getConfiguration();
+ frontend.retrieveClient(options);
- InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+ Configuration config = frontend.getConfiguration();
- checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
+ InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+
+ checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
+
+ }
+
+
+ ///////////
+ // Utils //
+ ///////////
+
+ private File writeYarnPropertiesFile(String contents) throws IOException {
+ File tmpFolder = temporaryFolder.newFolder();
+ String currentUser = System.getProperty("user.name");
+
+ // copy .yarn-properties-<username>
+ File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
+ Files.write(testPropertiesFile.toPath(), contents.getBytes(), StandardOpenOption.CREATE);
+
+ // copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
+ String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
+ File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
+ Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
+
+ return tmpFolder.getAbsoluteFile();
+ }
+
+ private static class TestCLI extends CliFrontend {
+ TestCLI(String configDir) throws Exception {
+ super(configDir);
+ }
+
+ @Override
+ // make method public
+ public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
+ return super.getClient(options, programName);
+ }
+
+ @Override
+ // make method public
+ public ClusterClient retrieveClient(CommandLineOptions options) {
+ return super.retrieveClient(options);
+ }
+ }
+
+
+ /**
+ * Injects an extended FlinkYarnSessionCli that deals with mocking Yarn communication
+ */
+ private static class CustomYarnTestCLI extends TestCLI {
+
+ // the default application status for yarn applications to be retrieved
+ private final FinalApplicationStatus finalApplicationStatus;
+
+ CustomYarnTestCLI(String configDir) throws Exception {
+ this(configDir, FinalApplicationStatus.UNDEFINED);
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+
+ CustomYarnTestCLI(String configDir, FinalApplicationStatus finalApplicationStatus) throws Exception {
+ super(configDir);
+ this.finalApplicationStatus = finalApplicationStatus;
+ }
+
+ @Override
+ public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+ // inject the testing FlinkYarnSessionCli
+ return new TestingYarnSessionCli();
+ }
+
+ /**
+ * Testing FlinkYarnSessionCli which returns a modified cluster descriptor for testing.
+ */
+ private class TestingYarnSessionCli extends FlinkYarnSessionCli {
+ TestingYarnSessionCli() {
+ super("y", "yarn");
+ }
+
+ @Override
+ // override cluster descriptor to replace the YarnClient
+ protected AbstractYarnClusterDescriptor getClusterDescriptor() {
+ return new TestingYarnClusterDescriptor();
+ }
+
+ /**
+ * Replace the YarnClient for this test.
+ */
+ private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
+
+ @Override
+ protected YarnClient getYarnClient() {
+ return new TestYarnClient();
+ }
+
+ @Override
+ protected YarnClusterClient createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ YarnClient yarnClient,
+ ApplicationReport report,
+ Configuration flinkConfiguration,
+ Path sessionFilesDir,
+ boolean perJobCluster) throws IOException, YarnException {
+
+ return Mockito.mock(YarnClusterClient.class);
+ }
+
+
+ private class TestYarnClient extends YarnClientImpl {
+
+ private final List<ApplicationReport> reports = new LinkedList<>();
+
+ TestYarnClient() {
+ { // a report that of our Yarn application we want to resume from
+ ApplicationReport report = Mockito.mock(ApplicationReport.class);
+ Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
+ Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
+ Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
+ Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+ this.reports.add(report);
+ }
+ { // a second report, just for noise
+ ApplicationReport report = Mockito.mock(ApplicationReport.class);
+ Mockito.when(report.getHost()).thenReturn("1.2.3.4");
+ Mockito.when(report.getRpcPort()).thenReturn(-123);
+ Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
+ Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+ this.reports.add(report);
+ }
+ }
+
+ @Override
+ public List<ApplicationReport> getApplications() throws YarnException, IOException {
+ return reports;
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
+ for (ApplicationReport report : reports) {
+ if (report.getApplicationId().equals(appId)) {
+ return report;
+ }
+ }
+ throw new YarnException();
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index c842bdc..f71dd63 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -23,7 +23,6 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.flink.client.CliFrontend;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
@@ -37,8 +36,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
-
public class FlinkYarnSessionCliTest {
@Rule
@@ -53,9 +50,10 @@ public class FlinkYarnSessionCliTest {
fakeConf.createNewFile();
map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
- Options options = new Options();
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
- cli.addOptions(options);
+ Options options = new Options();
+ cli.addGeneralOptions(options);
+ cli.addRunOptions(options);
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
@@ -66,7 +64,7 @@ public class FlinkYarnSessionCliTest {
Assert.fail("Parsing failed with " + e.getMessage());
}
- YarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
+ AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
Assert.assertNotNull(flinkYarnDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c471fa4..aebb14d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -73,18 +75,8 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
/**
-* All classes in this package contain code taken from
-* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
-* and
-* https://github.com/hortonworks/simple-yarn-app
-* and
-* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
-*
-* The Flink jar is uploaded to HDFS by this client.
-* The application master and all the TaskManager containers get the jar file downloaded
-* by YARN into their local fs.
-*
-*/
+ * The descriptor with deployment information for spwaning or resuming a {@link YarnClusterClient}.
+ */
public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
@@ -132,7 +124,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private boolean detached;
- private String customName = null;
+ private String customName;
+
public AbstractYarnClusterDescriptor() {
// for unit tests only
@@ -321,49 +314,112 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
* Gets a Hadoop Yarn client
* @return Returns a YarnClient which has to be shutdown manually
*/
- public static YarnClient getYarnClient(Configuration conf) {
+ protected YarnClient getYarnClient() {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
return yarnClient;
}
- @Override
- public YarnClusterClient deploy() throws Exception {
+ /**
+ * Retrieves the Yarn application and cluster from the config
+ * @param config The config with entries to retrieve the cluster
+ * @return YarnClusterClient
+ * @deprecated This should be removed in the future
+ */
+ public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config)
+ throws UnsupportedOperationException {
+ String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+ int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ if (jobManagerHost != null && jobManagerPort != -1) {
- if (UserGroupInformation.isSecurityEnabled()) {
- if (!ugi.hasKerberosCredentials()) {
- throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
- "You may use kinit to authenticate and request a TGT from the Kerberos server.");
+ YarnClient yarnClient = getYarnClient();
+ final List<ApplicationReport> applicationReports;
+ try {
+ applicationReports = yarnClient.getApplications();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't get Yarn application reports", e);
}
- return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
- @Override
- public YarnClusterClient run() throws Exception {
- return deployInternal();
+ for (ApplicationReport report : applicationReports) {
+ if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) {
+ LOG.info("Found application '{}' " +
+ "with JobManager host name '{}' and port '{}' from Yarn properties file.",
+ report.getApplicationId(), jobManagerHost, jobManagerPort);
+ return retrieve(report.getApplicationId().toString());
}
- });
- } else {
- return deployInternal();
+ }
+
}
+
+ LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'",
+ jobManagerHost, jobManagerPort);
+
+ throw new IllegalConfigurationException("Could not resume Yarn cluster from config.");
}
@Override
- public AbstractFlinkYarnCluster attach(String appId) throws Exception {
- // check if required Hadoop environment variables are set. If not, warn user
- if(System.getenv("HADOOP_CONF_DIR") == null &&
- System.getenv("YARN_CONF_DIR") == null) {
- LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
- "The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
- "configuration for accessing YARN.");
+ public YarnClusterClient retrieve(String applicationID) {
+
+ try {
+ // check if required Hadoop environment variables are set. If not, warn user
+ if (System.getenv("HADOOP_CONF_DIR") == null &&
+ System.getenv("YARN_CONF_DIR") == null) {
+ LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
+ "The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
+ "configuration for accessing YARN.");
+ }
+
+ final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
+ final YarnClient yarnClient = getYarnClient();
+ final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
+
+ if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
+ // Flink cluster is not running anymore
+ LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}",
+ applicationID, appReport.getFinalApplicationStatus());
+ throw new RuntimeException("The Yarn application " + applicationID + " doesn't run anymore.");
+ }
+
+ LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
+ appReport.getHost(), appReport.getRpcPort(), applicationID);
+
+ flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
+ flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
+
+ return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false);
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
+ }
- final ApplicationId yarnAppId = ConverterUtils.toApplicationId(appId);
+ @Override
+ public YarnClusterClient deploy() {
- return new FlinkYarnCluster(yarnClient, yarnAppId, conf, flinkConfiguration, sessionFilesDir, detached);
+ try {
+
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (!ugi.hasKerberosCredentials()) {
+ throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
+ "You may use kinit to authenticate and request a TGT from the Kerberos server.");
+ }
+ return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
+ @Override
+ public YarnClusterClient run() throws Exception {
+ return deployInternal();
+ }
+ });
+ } else {
+ return deployInternal();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't deploy Yarn cluster", e);
+ }
}
+
/**
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
@@ -377,7 +433,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
// Create application via yarnClient
- final YarnClient yarnClient = getYarnClient(conf);
+ final YarnClient yarnClient = getYarnClient();
final YarnClientApplication yarnApplication = yarnClient.createApplication();
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
@@ -726,7 +782,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
// the Flink cluster is deployed in YARN. Represent cluster
- return new YarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir);
+ return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
}
/**
@@ -780,40 +836,44 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
@Override
- public String getClusterDescription() throws Exception {
+ public String getClusterDescription() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
- YarnClient yarnClient = getYarnClient(conf);
- YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+ YarnClient yarnClient = getYarnClient();
+ YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
- ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
- final String format = "|%-16s |%-16s %n";
- ps.printf("|Property |Value %n");
- ps.println("+---------------------------------------+");
- int totalMemory = 0;
- int totalCores = 0;
- for(NodeReport rep : nodes) {
- final Resource res = rep.getCapability();
- totalMemory += res.getMemory();
- totalCores += res.getVirtualCores();
- ps.format(format, "NodeID", rep.getNodeId());
- ps.format(format, "Memory", res.getMemory() + " MB");
- ps.format(format, "vCores", res.getVirtualCores());
- ps.format(format, "HealthReport", rep.getHealthReport());
- ps.format(format, "Containers", rep.getNumContainers());
+ ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ final String format = "|%-16s |%-16s %n";
+ ps.printf("|Property |Value %n");
ps.println("+---------------------------------------+");
+ int totalMemory = 0;
+ int totalCores = 0;
+ for (NodeReport rep : nodes) {
+ final Resource res = rep.getCapability();
+ totalMemory += res.getMemory();
+ totalCores += res.getVirtualCores();
+ ps.format(format, "NodeID", rep.getNodeId());
+ ps.format(format, "Memory", res.getMemory() + " MB");
+ ps.format(format, "vCores", res.getVirtualCores());
+ ps.format(format, "HealthReport", rep.getHealthReport());
+ ps.format(format, "Containers", rep.getNumContainers());
+ ps.println("+---------------------------------------+");
+ }
+ ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
+ List<QueueInfo> qInfo = yarnClient.getAllQueues();
+ for (QueueInfo q : qInfo) {
+ ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
+ q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
+ }
+ yarnClient.stop();
+ return baos.toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't get cluster description", e);
}
- ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
- List<QueueInfo> qInfo = yarnClient.getAllQueues();
- for(QueueInfo q : qInfo) {
- ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
- q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
- }
- yarnClient.stop();
- return baos.toString();
}
public String getSessionFilesDir() {
@@ -918,9 +978,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private static class YarnDeploymentException extends RuntimeException {
private static final long serialVersionUID = -812040641215388943L;
- public YarnDeploymentException() {
- }
-
public YarnDeploymentException(String message) {
super(message);
}
@@ -954,5 +1011,24 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
}
+
+ /**
+ * Creates a YarnClusterClient; may be overriden in tests
+ */
+ protected YarnClusterClient createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ YarnClient yarnClient,
+ ApplicationReport report,
+ org.apache.flink.configuration.Configuration flinkConfiguration,
+ Path sessionFilesDir,
+ boolean perJobCluster) throws IOException, YarnException {
+ return new YarnClusterClient(
+ descriptor,
+ yarnClient,
+ report,
+ flinkConfiguration,
+ sessionFilesDir,
+ perJobCluster);
+ }
}