You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:21 UTC
[07/10] flink git commit: [FLINK-3667] refactor client communication
classes
[FLINK-3667] refactor client communication classes
- ClusterDescriptor: base interface for cluster deployment descriptors
- ClusterDescriptor: YarnClusterDescriptor
- ClusterClient: base class for ClusterClients, handles lifecycle of cluster
- ClusterClient: shares configuration with the implementations
- ClusterClient: StandaloneClusterClient, YarnClusterClient
- ClusterClient: remove run methods and enable detached mode via flag
- CliFrontend: remove all Yarn specific logic
- CliFrontend: remove all cluster setup logic
- CustomCommandLine: interface for other cluster implementations
- Customcommandline: enables creation of new cluster or resuming from existing
- Yarn: move Yarn classes and functionality to the yarn module (yarn
properties, yarn interfaces)
- Yarn: improve reliability of cluster startup
- Yarn Tests: only disable parallel execution of ITCases
This closes #1978
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b52a31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b52a31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b52a31
Branch: refs/heads/master
Commit: f9b52a3114a2114e6846091acf3abb294a49615b
Parents: efc344a
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Apr 22 19:52:54 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jun 17 10:37:58 2016 +0200
----------------------------------------------------------------------
.../api/avro/AvroExternalJarProgramITCase.java | 15 +-
.../org/apache/flink/client/CliFrontend.java | 359 ++-----
.../flink/client/FlinkYarnSessionCli.java | 505 ----------
.../org/apache/flink/client/RemoteExecutor.java | 9 +-
.../flink/client/cli/CliFrontendParser.java | 114 ++-
.../flink/client/cli/CustomCommandLine.java | 57 ++
.../client/deployment/ClusterDescriptor.java | 41 +
.../org/apache/flink/client/program/Client.java | 624 ------------
.../flink/client/program/ClusterClient.java | 695 ++++++++++++++
.../client/program/ContextEnvironment.java | 12 +-
.../program/ContextEnvironmentFactory.java | 18 +-
.../client/program/DetachedEnvironment.java | 6 +-
.../client/program/StandaloneClusterClient.java | 98 ++
.../CliFrontendAddressConfigurationTest.java | 125 +--
.../client/CliFrontendPackageProgramTest.java | 5 +-
.../apache/flink/client/CliFrontendRunTest.java | 26 +-
.../flink/client/CliFrontendTestUtils.java | 32 +-
.../TestingClusterClientWithoutActorSystem.java | 55 ++
.../client/program/ClientConnectionTest.java | 2 +-
.../apache/flink/client/program/ClientTest.java | 33 +-
.../program/ExecutionPlanCreationTest.java | 2 +-
.../org/apache/flink/storm/api/FlinkClient.java | 11 +-
.../flink/api/common/JobExecutionResult.java | 3 +
.../flink/api/common/JobSubmissionResult.java | 24 +-
.../main/flink-bin/conf/log4j-cli.properties | 2 +-
.../src/main/flink-bin/yarn-bin/yarn-session.sh | 2 +-
.../operations/DegreesWithExceptionITCase.java | 2 +-
.../ReduceOnEdgesWithExceptionITCase.java | 2 +-
.../ReduceOnNeighborsWithExceptionITCase.java | 2 +-
.../webmonitor/handlers/JarActionHandler.java | 4 +-
.../apache/flink/runtime/client/JobClient.java | 17 +-
.../clusterframework/ApplicationStatus.java | 1 +
.../clusterframework/FlinkResourceManager.java | 2 +-
.../messages/GetClusterStatusResponse.java | 2 +-
.../runtime/yarn/AbstractFlinkYarnClient.java | 143 ---
.../runtime/yarn/AbstractFlinkYarnCluster.java | 123 ---
.../org/apache/flink/api/scala/FlinkShell.scala | 82 +-
.../flink/api/scala/ExecutionEnvironment.scala | 2 +-
.../elasticsearch2/ElasticsearchSinkITCase.java | 2 +-
.../environment/RemoteStreamEnvironment.java | 9 +-
.../environment/StreamContextEnvironment.java | 5 +-
.../RemoteEnvironmentITCase.java | 2 +-
.../flink/test/misc/AutoParallelismITCase.java | 2 +-
.../test/recovery/SimpleRecoveryITCase.java | 2 +-
flink-yarn-tests/pom.xml | 15 +-
...CliFrontendYarnAddressConfigurationTest.java | 220 +++++
.../flink/yarn/FlinkYarnSessionCliTest.java | 14 +-
.../flink/yarn/TestingFlinkYarnClient.java | 71 --
.../yarn/TestingYarnClusterDescriptor.java | 71 ++
.../flink/yarn/YARNHighAvailabilityITCase.java | 9 +-
.../YARNSessionCapacitySchedulerITCase.java | 6 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 20 +-
.../org/apache/flink/yarn/YarnTestBase.java | 4 +-
.../yarn/AbstractYarnClusterDescriptor.java | 943 +++++++++++++++++++
.../org/apache/flink/yarn/FlinkYarnClient.java | 28 -
.../apache/flink/yarn/FlinkYarnClientBase.java | 907 ------------------
.../org/apache/flink/yarn/FlinkYarnCluster.java | 559 -----------
.../flink/yarn/YarnApplicationMasterRunner.java | 7 +-
.../apache/flink/yarn/YarnClusterClient.java | 577 ++++++++++++
.../flink/yarn/YarnClusterDescriptor.java | 28 +
.../flink/yarn/cli/FlinkYarnSessionCli.java | 606 ++++++++++++
.../apache/flink/yarn/ApplicationClient.scala | 8 +-
.../org/apache/flink/yarn/YarnMessages.scala | 7 +-
63 files changed, 3799 insertions(+), 3580 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index ac10074..29a7e58 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,19 +19,12 @@
package org.apache.flink.api.avro;
import java.io.File;
-import java.net.InetAddress;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.RemoteExecutor;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
@@ -64,10 +57,10 @@ public class AvroExternalJarProgramITCase {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
- Client client = new Client(config);
+ ClusterClient client = new StandaloneClusterClient(config);
client.setPrintStatusDuringExecution(false);
- client.runBlocking(program, 4);
+ client.run(program, 4);
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 6d972bc..cf7a8c2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -20,8 +20,6 @@ package org.apache.flink.client;
import akka.actor.ActorSystem;
-import org.apache.commons.cli.CommandLine;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -31,18 +29,21 @@ import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.cli.SavepointOptions;
import org.apache.flink.client.cli.StopOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -53,7 +54,6 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -68,8 +68,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSucc
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
@@ -81,10 +79,8 @@ import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.text.SimpleDateFormat;
@@ -93,10 +89,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
@@ -121,20 +115,6 @@ public class CliFrontend {
private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
- // YARN-session related constants
- public static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
- public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
- public static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
- public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
-
- public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
-
- /**
- * A special host name used to run a job by deploying Flink into a YARN cluster,
- * if this string is specified as the JobManager address
- */
- public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
-
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
@@ -149,12 +129,9 @@ public class CliFrontend {
private ActorSystem actorSystem;
- private AbstractFlinkYarnCluster yarnCluster;
-
/**
*
- * @throws Exception Thrown if the configuration directory was not found, the configuration could not
- * be loaded, or the YARN properties could not be parsed.
+ * @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
*/
public CliFrontend() throws Exception {
this(getConfigurationDirectoryFromEnv());
@@ -171,61 +148,6 @@ public class CliFrontend {
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();
- // load the YARN properties
- File propertiesFile = new File(getYarnPropertiesLocation(config));
- if (propertiesFile.exists()) {
-
- logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
-
- Properties yarnProperties = new Properties();
- try {
- try (InputStream is = new FileInputStream(propertiesFile)) {
- yarnProperties.load(is);
- }
- }
- catch (IOException e) {
- throw new Exception("Cannot read the YARN properties file", e);
- }
-
- // configure the default parallelism from YARN
- String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
- if (propParallelism != null) { // maybe the property is not set
- try {
- int parallelism = Integer.parseInt(propParallelism);
- this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
-
- logAndSysout("YARN properties set default parallelism to " + parallelism);
- }
- catch (NumberFormatException e) {
- throw new Exception("Error while parsing the YARN properties: " +
- "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
- }
- }
-
- // get the JobManager address from the YARN properties
- String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
- InetSocketAddress jobManagerAddress;
- if (address != null) {
- try {
- jobManagerAddress = ClientUtils.parseHostPortAddress(address);
- // store address in config from where it is retrieved by the retrieval service
- writeJobManagerAddressToConfig(jobManagerAddress);
- }
- catch (Exception e) {
- throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
- }
-
- logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
- }
-
- // handle the YARN client's dynamic properties
- String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
- Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
- for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
- this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
- }
- }
-
try {
FileSystem.setDefaultScheme(config);
} catch (IOException e) {
@@ -301,61 +223,33 @@ public class CliFrontend {
return handleError(t);
}
- int exitCode = 1;
+ ClusterClient client = null;
try {
- int userParallelism = options.getParallelism();
- LOG.debug("User parallelism is set to {}", userParallelism);
- Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode());
+ client = getClient(options, program.getMainClassName());
client.setPrintStatusDuringExecution(options.getStdoutLogging());
+ client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
LOG.debug("Savepoint path is set to {}", options.getSavepointPath());
- try {
- if (client.getMaxSlots() != -1 && userParallelism == -1) {
- logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
- "To use another parallelism, set it at the ./bin/flink client.");
- userParallelism = client.getMaxSlots();
- }
-
- // detached mode
- if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached())) {
- exitCode = executeProgramDetached(program, client, userParallelism);
- }
- else {
- exitCode = executeProgramBlocking(program, client, userParallelism);
- }
-
- // show YARN cluster status if its not a detached YARN cluster.
- if (yarnCluster != null && !yarnCluster.isDetached()) {
- List<String> msgs = yarnCluster.getNewMessages();
- if (msgs != null && msgs.size() > 1) {
-
- logAndSysout("The following messages were created by the YARN cluster while running the Job:");
- for (String msg : msgs) {
- logAndSysout(msg);
- }
- }
- if (yarnCluster.hasFailed()) {
- logAndSysout("YARN cluster is in failed state!");
- logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
- }
- }
-
- return exitCode;
- }
- finally {
- client.shutdown();
+ int userParallelism = options.getParallelism();
+ LOG.debug("User parallelism is set to {}", userParallelism);
+ if (client.getMaxSlots() != -1 && userParallelism == -1) {
+ logAndSysout("Using the parallelism provided by the remote cluster ("
+ + client.getMaxSlots()+"). "
+ + "To use another parallelism, set it at the ./bin/flink client.");
+ userParallelism = client.getMaxSlots();
}
+
+ return executeProgram(program, client, userParallelism);
}
catch (Throwable t) {
return handleError(t);
}
finally {
- if (yarnCluster != null && !yarnCluster.isDetached()) {
- logAndSysout("Shutting down YARN cluster");
- yarnCluster.shutdown(exitCode != 0);
+ if (client != null) {
+ client.shutdown();
}
if (program != null) {
program.deleteExtractedLibraries();
@@ -410,7 +304,7 @@ public class CliFrontend {
LOG.info("Creating program plan dump");
Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
- FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
+ FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism);
String jsonPlan = null;
if (flinkPlan instanceof OptimizedPlan) {
@@ -830,53 +724,30 @@ public class CliFrontend {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------
- protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
- LOG.info("Starting execution of program");
+ protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
+ logAndSysout("Starting execution of program");
JobSubmissionResult result;
try {
- result = client.runDetached(program, parallelism);
+ result = client.run(program, parallelism);
} catch (ProgramInvocationException e) {
return handleError(e);
} finally {
program.deleteExtractedLibraries();
}
- if (yarnCluster != null) {
- yarnCluster.stopAfterJob(result.getJobID());
- yarnCluster.disconnect();
- }
-
- System.out.println("Job has been submitted with JobID " + result.getJobID());
-
- return 0;
- }
-
- protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
- LOG.info("Starting execution of program");
-
- JobSubmissionResult result;
- try {
- result = client.runBlocking(program, parallelism);
- }
- catch (ProgramInvocationException e) {
- return handleError(e);
- }
- finally {
- program.deleteExtractedLibraries();
- }
-
- LOG.info("Program execution finished");
-
- if (result instanceof JobExecutionResult) {
- JobExecutionResult execResult = (JobExecutionResult) result;
+ if(result.isJobExecutionResults()) {
+ logAndSysout("Program execution finished");
+ JobExecutionResult execResult = result.getJobExecutionResult();
System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
+ } else {
+ logAndSysout("Job has been submitted with JobID " + result.getJobID());
}
return 0;
@@ -923,16 +794,6 @@ public class CliFrontend {
}
/**
- * Writes the given job manager address to the associated configuration object
- *
- * @param address Address to write to the configuration
- */
- protected void writeJobManagerAddressToConfig(InetSocketAddress address) {
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
- }
-
- /**
* Updates the associated configuration with the given command line options
*
* @param options Command line options
@@ -940,7 +801,7 @@ public class CliFrontend {
protected void updateConfig(CommandLineOptions options) {
if(options.getJobManagerAddress() != null){
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
- writeJobManagerAddressToConfig(jobManagerAddress);
+ writeJobManagerAddressToConfig(config, jobManagerAddress);
}
}
@@ -980,110 +841,65 @@ public class CliFrontend {
}
/**
- * Retrieves a {@link Client} object from the given command line options and other parameters.
+ * Retrieves a {@link ClusterClient} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
* @param programName Program name
- * @param userParallelism Given user parallelism
* @throws Exception
*/
- protected Client getClient(
+ protected ClusterClient getClient(
CommandLineOptions options,
- String programName,
- int userParallelism,
- boolean detachedMode)
+ String programName)
throws Exception {
InetSocketAddress jobManagerAddress;
- int maxSlots = -1;
- if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
- logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
+ // try to get the JobManager address via command-line args
+ if (options.getJobManagerAddress() != null) {
- // Default yarn application name to use, if nothing is specified on the command line
- String applicationName = "Flink Application: " + programName;
+ // Get the custom command-lines (e.g. Yarn/Mesos)
+ CustomCommandLine<?> activeCommandLine =
+ CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
- // user wants to run Flink in YARN cluster.
- CommandLine commandLine = options.getCommandLine();
- AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser
- .getFlinkYarnSessionCli()
- .withDefaultApplicationName(applicationName)
- .createFlinkYarnClient(commandLine);
+ if (activeCommandLine != null) {
+ logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console");
- if (flinkYarnClient == null) {
- throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
- }
+ // Default yarn application name to use, if nothing is specified on the command line
+ String applicationName = "Flink Application: " + programName;
- // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
- // from yarn options.
- if (detachedMode) {
- flinkYarnClient.setDetachedMode(true);
- }
+ ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine());
- // the number of slots available from YARN:
- int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
- if (yarnTmSlots == -1) {
- yarnTmSlots = 1;
- }
- maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
- if (userParallelism != -1) {
- int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
- logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
- "but the user requested a parallelism of " + userParallelism + " on YARN. " +
- "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
- "will get "+slotsPerTM+" slots.");
- flinkYarnClient.setTaskManagerSlots(slotsPerTM);
- }
+ logAndSysout("Cluster started");
+ logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
- try {
- yarnCluster = flinkYarnClient.deploy();
- yarnCluster.connectToCluster();
- }
- catch (Exception e) {
- throw new RuntimeException("Error deploying the YARN cluster", e);
+ return client;
+ } else {
+ // job manager address supplied on the command-line
+ LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress());
+ jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
+ writeJobManagerAddressToConfig(config, jobManagerAddress);
+ return new StandaloneClusterClient(config);
}
- jobManagerAddress = yarnCluster.getJobManagerAddress();
- writeJobManagerAddressToConfig(jobManagerAddress);
-
- // overwrite the yarn client config (because the client parses the dynamic properties)
- this.config.addAll(flinkYarnClient.getFlinkConfiguration());
-
- logAndSysout("YARN cluster started");
- logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
- logAndSysout("Waiting until all TaskManagers have connected");
-
- while(true) {
- GetClusterStatusResponse status = yarnCluster.getClusterStatus();
- if (status != null) {
- if (status.numRegisteredTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
- logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
- + flinkYarnClient.getTaskManagerCount() + ")");
- } else {
- logAndSysout("All TaskManagers are connected");
- break;
- }
- } else {
- logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
- }
-
- try {
- Thread.sleep(500);
- }
- catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for TaskManagers");
- System.err.println("Thread is interrupted");
- Thread.currentThread().interrupt();
+ // try to get the JobManager address via resuming of a cluster
+ } else {
+ for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) {
+ ClusterClient client = cli.retrieveCluster(config);
+ if (client != null) {
+ LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
+ return client;
}
}
}
- else {
- if(options.getJobManagerAddress() != null) {
- jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
- writeJobManagerAddressToConfig(jobManagerAddress);
- }
- }
- return new Client(config, maxSlots);
+ // read JobManager address from the config
+ if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
+ return new StandaloneClusterClient(config);
+ // We tried hard but couldn't find a JobManager address
+ } else {
+ throw new IllegalConfigurationException(
+ "The JobManager address is neither provided at the command-line, " +
+ "nor configured in flink-conf.yaml.");
+ }
}
// --------------------------------------------------------------------------------------------
@@ -1275,33 +1091,16 @@ public class CliFrontend {
return location;
}
- public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
- if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
- Map<String, String> properties = new HashMap<>();
-
- String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
- for (String propLine : propertyLines) {
- if (propLine == null) {
- continue;
- }
-
- String[] kv = propLine.split("=");
- if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
- properties.put(kv[0], kv[1]);
- }
- }
- return properties;
- }
- else {
- return Collections.emptyMap();
- }
- }
-
- public static String getYarnPropertiesLocation(Configuration conf) {
- String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
- String currentUser = System.getProperty("user.name");
- String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
- return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser;
+ /**
+ * Writes the given job manager address to the associated configuration object
+ *
+ * @param address Address to write to the configuration
+ * @param config The config to write to
+ */
+ public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) {
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
deleted file mode 100644
index bb61ffb..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.client;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Class handling the command line interface to the YARN session.
- */
-public class FlinkYarnSessionCli {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
-
- //------------------------------------ Constants -------------------------
-
- private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
- public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
- public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
-
- private static final int CLIENT_POLLING_INTERVALL = 3;
-
-
- //------------------------------------ Command Line argument options -------------------------
- // the prefix transformation is used by the CliFrontend static constructor.
- private final Option QUERY;
- // --- or ---
- private final Option QUEUE;
- private final Option SHIP_PATH;
- private final Option FLINK_JAR;
- private final Option JM_MEMORY;
- private final Option TM_MEMORY;
- private final Option CONTAINER;
- private final Option SLOTS;
- private final Option DETACHED;
- private final Option STREAMING;
- private final Option NAME;
-
- /**
- * Dynamic properties allow the user to specify additional configuration values with -D, such as
- * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
- */
- private final Option DYNAMIC_PROPERTIES;
-
- private final boolean acceptInteractiveInput;
-
- //------------------------------------ Internal fields -------------------------
- private AbstractFlinkYarnCluster yarnCluster = null;
- private boolean detachedMode = false;
-
- /** Default yarn application name. */
- private String defaultApplicationName = null;
-
- public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
- this.acceptInteractiveInput = acceptInteractiveInput;
-
- QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
- QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
- SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
- FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
- JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
- TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
- CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
- SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
- DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
- DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
- STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
- NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
- }
-
- /**
- * Creates a new Yarn Client.
- * @param cmd the command line to parse options from
- * @return an instance of the client or null if there was an error
- */
- public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
-
- AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
- if (flinkYarnClient == null) {
- return null;
- }
-
- if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
- LOG.error("Missing required argument " + CONTAINER.getOpt());
- printUsage();
- return null;
- }
- flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
-
- // Jar Path
- Path localJarPath;
- if (cmd.hasOption(FLINK_JAR.getOpt())) {
- String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
- if(!userPath.startsWith("file://")) {
- userPath = "file://" + userPath;
- }
- localJarPath = new Path(userPath);
- } else {
- LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
- localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
- }
-
- flinkYarnClient.setLocalJarPath(localJarPath);
-
- // Conf Path
- String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
- GlobalConfiguration.loadConfiguration(confDirPath);
- Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
- flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
- flinkYarnClient.setConfigurationDirectory(confDirPath);
- File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
- if (!confFile.exists()) {
- LOG.error("Unable to locate configuration file in "+confFile);
- return null;
- }
- Path confPath = new Path(confFile.getAbsolutePath());
-
- flinkYarnClient.setConfigurationFilePath(confPath);
-
- List<File> shipFiles = new ArrayList<>();
- // path to directory to ship
- if (cmd.hasOption(SHIP_PATH.getOpt())) {
- String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
- File shipDir = new File(shipPath);
- if (shipDir.isDirectory()) {
- shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return !(name.equals(".") || name.equals(".."));
- }
- })));
- } else {
- LOG.warn("Ship directory is not a directory. Ignoring it.");
- }
- }
-
- //check if there is a logback or log4j file
- if (confDirPath.length() > 0) {
- File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
- if (logback.exists()) {
- shipFiles.add(logback);
- flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
- }
- File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
- if (log4j.exists()) {
- shipFiles.add(log4j);
- if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
- // this means there is already a logback configuration file --> fail
- LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " +
- "Logback configuration files. Please delete or rename one of them.");
- } // else
- flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
- }
- }
-
- flinkYarnClient.setShipFiles(shipFiles);
-
- // queue
- if (cmd.hasOption(QUEUE.getOpt())) {
- flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
- }
-
- // JobManager Memory
- if (cmd.hasOption(JM_MEMORY.getOpt())) {
- int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
- flinkYarnClient.setJobManagerMemory(jmMemory);
- }
-
- // Task Managers memory
- if (cmd.hasOption(TM_MEMORY.getOpt())) {
- int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
- flinkYarnClient.setTaskManagerMemory(tmMemory);
- }
-
- if (cmd.hasOption(SLOTS.getOpt())) {
- int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
- flinkYarnClient.setTaskManagerSlots(slots);
- }
-
- String[] dynamicProperties = null;
- if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
- dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
- }
- String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties,
- CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
- flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
- if (cmd.hasOption(DETACHED.getOpt())) {
- this.detachedMode = true;
- flinkYarnClient.setDetachedMode(detachedMode);
- }
-
- if(cmd.hasOption(NAME.getOpt())) {
- flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
- } else {
- // set the default application name, if none is specified
- if(defaultApplicationName != null) {
- flinkYarnClient.setName(defaultApplicationName);
- }
- }
-
- return flinkYarnClient;
- }
-
-
- private void printUsage() {
- System.out.println("Usage:");
- HelpFormatter formatter = new HelpFormatter();
- formatter.setWidth(200);
- formatter.setLeftPadding(5);
- formatter.setSyntaxPrefix(" Required");
- Options req = new Options();
- req.addOption(CONTAINER);
- formatter.printHelp(" ", req);
-
- formatter.setSyntaxPrefix(" Optional");
- Options opt = new Options();
- opt.addOption(JM_MEMORY);
- opt.addOption(TM_MEMORY);
- opt.addOption(QUERY);
- opt.addOption(QUEUE);
- opt.addOption(SLOTS);
- opt.addOption(DYNAMIC_PROPERTIES);
- opt.addOption(DETACHED);
- opt.addOption(STREAMING);
- opt.addOption(NAME);
- formatter.printHelp(" ", opt);
- }
-
- public static AbstractFlinkYarnClient getFlinkYarnClient() {
- AbstractFlinkYarnClient yarnClient;
- try {
- Class<? extends AbstractFlinkYarnClient> yarnClientClass =
- Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
- yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
- }
- catch (ClassNotFoundException e) {
- System.err.println("Unable to locate the Flink YARN Client. " +
- "Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " +
- e.getMessage());
- e.printStackTrace(System.err);
- return null; // make it obvious
- }
- return yarnClient;
- }
-
- private static void writeYarnProperties(Properties properties, File propertiesFile) {
- try {
- OutputStream out = new FileOutputStream(propertiesFile);
- properties.store(out, "Generated YARN properties file");
- out.close();
- } catch (IOException e) {
- throw new RuntimeException("Error writing the properties file", e);
- }
- propertiesFile.setReadable(true, false); // readable for all.
- }
-
- public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster, boolean readConsoleInput) {
- final String HELP = "Available commands:\n" +
- "help - show these commands\n" +
- "stop - stop the YARN session";
- int numTaskmanagers = 0;
- try {
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- label:
- while (true) {
- // ------------------ check if there are updates by the cluster -----------
-
- GetClusterStatusResponse status = yarnCluster.getClusterStatus();
- LOG.debug("Received status message: {}", status);
-
- if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
- System.err.println("Number of connected TaskManagers changed to " +
- status.numRegisteredTaskManagers() + ". " +
- "Slots available: " + status.totalNumberOfSlots());
- numTaskmanagers = status.numRegisteredTaskManagers();
- }
-
- List<String> messages = yarnCluster.getNewMessages();
- if (messages != null && messages.size() > 0) {
- System.err.println("New messages from the YARN cluster: ");
- for (String msg : messages) {
- System.err.println(msg);
- }
- }
-
- if (yarnCluster.hasFailed()) {
- System.err.println("The YARN cluster has failed");
- yarnCluster.shutdown(true);
- }
-
- // wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
- long startTime = System.currentTimeMillis();
- while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
- && (!readConsoleInput || !in.ready()))
- {
- Thread.sleep(200);
- }
- //------------- handle interactive command by user. ----------------------
-
- if (readConsoleInput && in.ready()) {
- String command = in.readLine();
- switch (command) {
- case "quit":
- case "stop":
- break label;
-
- case "help":
- System.err.println(HELP);
- break;
- default:
- System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP);
- break;
- }
- }
-
- if (yarnCluster.hasBeenStopped()) {
- LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
- break;
- }
- }
- } catch(Exception e) {
- LOG.warn("Exception while running the interactive command line interface", e);
- }
- }
-
- public static void main(String[] args) {
- FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
- System.exit(cli.run(args));
- }
-
- public void getYARNSessionCLIOptions(Options options) {
- options.addOption(FLINK_JAR);
- options.addOption(JM_MEMORY);
- options.addOption(TM_MEMORY);
- options.addOption(CONTAINER);
- options.addOption(QUEUE);
- options.addOption(QUERY);
- options.addOption(SHIP_PATH);
- options.addOption(SLOTS);
- options.addOption(DYNAMIC_PROPERTIES);
- options.addOption(DETACHED);
- options.addOption(STREAMING);
- options.addOption(NAME);
- }
-
- public int run(String[] args) {
- //
- // Command Line Options
- //
- Options options = new Options();
- getYARNSessionCLIOptions(options);
-
- CommandLineParser parser = new PosixParser();
- CommandLine cmd;
- try {
- cmd = parser.parse(options, args);
- } catch(Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- return 1;
- }
-
- // Query cluster for metrics
- if (cmd.hasOption(QUERY.getOpt())) {
- AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
- String description;
- try {
- description = flinkYarnClient.getClusterDescription();
- } catch (Exception e) {
- System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- System.out.println(description);
- return 0;
- } else {
- AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd);
-
- if (flinkYarnClient == null) {
- System.err.println("Error while starting the YARN Client. Please check log output!");
- return 1;
- }
-
- try {
- yarnCluster = flinkYarnClient.deploy();
- // only connect to cluster if its not a detached session.
- if(!flinkYarnClient.isDetached()) {
- yarnCluster.connectToCluster();
- }
- } catch (Exception e) {
- System.err.println("Error while deploying YARN cluster: "+e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- //------------------ Cluster deployed, handle connection details
- String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
- System.out.println("Flink JobManager is now running on " + jobManagerAddress);
- System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
-
- // file that we write into the conf/ dir containing the jobManager address and the dop.
- File yarnPropertiesFile = new File(CliFrontend.getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
-
- Properties yarnProps = new Properties();
- yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
- if (flinkYarnClient.getTaskManagerSlots() != -1) {
- String parallelism =
- Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
- yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_PARALLELISM, parallelism);
- }
- // add dynamic properties
- if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
- yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
- flinkYarnClient.getDynamicPropertiesEncoded());
- }
- writeYarnProperties(yarnProps, yarnPropertiesFile);
-
- //------------------ Cluster running, let user control it ------------
-
- if (detachedMode) {
- // print info and quit:
- LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
- "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
- "Please also note that the temporary files of the YARN session in {} will not be removed.",
- flinkYarnClient.getSessionFilesDir());
- } else {
- runInteractiveCli(yarnCluster, acceptInteractiveInput);
-
- if (!yarnCluster.hasBeenStopped()) {
- LOG.info("Command Line Interface requested session shutdown");
- yarnCluster.shutdown(false);
- }
-
- try {
- yarnPropertiesFile.delete();
- } catch (Exception e) {
- LOG.warn("Exception while deleting the JobManager address file", e);
- }
- }
- }
- return 0;
- }
-
- /**
- * Sets the default Yarn Application Name.
- * @param defaultApplicationName the name of the yarn application to use
- * @return FlinkYarnSessionCli instance, for chaining
- */
- public FlinkYarnSessionCli withDefaultApplicationName(String defaultApplicationName) {
- this.defaultApplicationName = defaultApplicationName;
- return this;
- }
-
- /**
- * Utility method for tests.
- */
- public void stop() {
- if (yarnCluster != null) {
- LOG.info("Command line interface is shutting down the yarnCluster");
- yarnCluster.shutdown(false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index ab70453..86b36b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -27,8 +27,9 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.configuration.ConfigConstants;
@@ -57,7 +58,7 @@ public class RemoteExecutor extends PlanExecutor {
private final Configuration clientConfiguration;
- private Client client;
+ private ClusterClient client;
private int defaultParallelism = 1;
@@ -149,7 +150,7 @@ public class RemoteExecutor extends PlanExecutor {
public void start() throws Exception {
synchronized (lock) {
if (client == null) {
- client = new Client(clientConfiguration);
+ client = new StandaloneClusterClient(clientConfiguration);
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
}
else {
@@ -207,7 +208,7 @@ public class RemoteExecutor extends PlanExecutor {
}
try {
- return client.runBlocking(program, defaultParallelism);
+ return client.run(program, defaultParallelism).getJobExecutionResult();
}
finally {
if (shutDownAtEnd) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index b75952e..f28d1b6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,8 +24,16 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A simple command line parser (based on Apache Commons CLI) that extracts command
@@ -33,9 +41,17 @@ import org.apache.flink.client.FlinkYarnSessionCli;
*/
public class CliFrontendParser {
+ private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class);
+
+
/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
- private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn", true);
+ private static final Map<String, CustomCommandLine> customCommandLine = new HashMap<>(1);
+
+ static {
+ // we could easily add more here in the future
+ loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
+ }
static final Option HELP_OPTION = new Option("h", "help", false,
@@ -43,7 +59,7 @@ public class CliFrontendParser {
static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
- public static final Option CLASS_OPTION = new Option("c", "class", true,
+ static final Option CLASS_OPTION = new Option("c", "class", true,
"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
"JAR file does not specify the class in its manifest.");
@@ -53,23 +69,23 @@ public class CliFrontendParser {
"times for specifying more than one URL. The protocol must be supported by the " +
"{@link java.net.URLClassLoader}.");
- static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
+ public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");
static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
"supress logging output to standard out.");
- static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
+ public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");
static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
- "Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER +
- "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
- "different JobManager than the one specified in the configuration.");
+ "Address of the JobManager (master) to which to connect. " +
+ "Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " +
+ "Use this flag to connect to a different JobManager than the one specified in the configuration.");
static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
@@ -143,8 +159,10 @@ public class CliFrontendParser {
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);
- // also add the YARN options so that the parser can parse them
- yarnSessionCLi.getYARNSessionCLIOptions(options);
+ for (CustomCommandLine customCLI : customCommandLine.values()) {
+ customCLI.addOptions(options);
+ }
+
return options;
}
@@ -240,10 +258,16 @@ public class CliFrontendParser {
System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"run\" action options:");
formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
- formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
- Options yarnOpts = new Options();
- yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
- formatter.printHelp(" ", yarnOpts);
+
+ // prints options from all available command-line classes
+ for (Map.Entry<String, CustomCommandLine> entry: customCommandLine.entrySet()) {
+ formatter.setSyntaxPrefix(" Additional arguments if -m " + entry.getKey() + " is set:");
+ Options customOpts = new Options();
+ entry.getValue().addOptions(customOpts);
+ formatter.printHelp(" ", customOpts);
+ System.out.println();
+ }
+
System.out.println();
}
@@ -376,7 +400,63 @@ public class CliFrontendParser {
}
}
- public static FlinkYarnSessionCli getFlinkYarnSessionCli() {
- return yarnSessionCLi;
+ public static Map<String, CustomCommandLine> getAllCustomCommandLine() {
+ if (customCommandLine.isEmpty()) {
+ LOG.warn("No custom command-line classes were loaded.");
+ }
+ return Collections.unmodifiableMap(customCommandLine);
+ }
+
+ private static String getCliIdentifierString() {
+ StringBuilder builder = new StringBuilder();
+ boolean first = true;
+ for (String identifier : customCommandLine.keySet()) {
+ if (!first) {
+ builder.append(", ");
+ }
+ first = false;
+ builder.append("'").append(identifier).append("'");
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Gets the custom command-line for this identifier.
+ * @param identifier The unique identifier for this command-line implementation.
+ * @return CustomCommandLine or null if none was found
+ */
+ public static CustomCommandLine getActiveCustomCommandLine(String identifier) {
+ return CliFrontendParser.getAllCustomCommandLine().get(identifier);
}
+
+ private static void loadCustomCommandLine(String className, Object... params) {
+
+ try {
+ Class<? extends CustomCommandLine> customCliClass =
+ Class.forName(className).asSubclass(CustomCommandLine.class);
+
+ // construct class types from the parameters
+ Class<?>[] types = new Class<?>[params.length];
+ for (int i = 0; i < params.length; i++) {
+ Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+ types[i] = params[i].getClass();
+ }
+
+ Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
+ final CustomCommandLine cli = constructor.newInstance(params);
+
+ String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier());
+ CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli);
+
+ if (existing != null) {
+ throw new IllegalStateException("Attempted to register " + cliIdentifier +
+ " but there is already a command-line with this identifier.");
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
+ | InvocationTargetException e) {
+ LOG.warn("Unable to locate custom CLI class {}. " +
+ "Flink is not compiled with support for this class.", className, e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
new file mode 100644
index 0000000..cd5e0e6
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line interface.
+ */
+public interface CustomCommandLine<ClusterType extends ClusterClient> {
+
+ /**
+ * Returns a unique identifier for this custom command-line.
+ * @return An unique identifier string
+ */
+ String getIdentifier();
+
+ /**
+ * Adds custom options to the existing options.
+ * @param baseOptions The existing options.
+ */
+ void addOptions(Options baseOptions);
+
+ /**
+ * Retrieves a client for a running cluster
+ * @param config The Flink config
+ * @return Client if a cluster could be retrieve, null otherwise
+ */
+ ClusterClient retrieveCluster(Configuration config) throws Exception;
+
+ /**
+ * Creates the client for the cluster
+ * @param applicationName The application name to use
+ * @param commandLine The command-line options parsed by the CliFrontend
+ * @return The client to communicate with the cluster which the CustomCommandLine brought up.
+ */
+ ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
new file mode 100644
index 0000000..cf0595b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+
+import org.apache.flink.client.program.ClusterClient;
+
+/**
+ * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
+ */
+public interface ClusterDescriptor<ClientType extends ClusterClient> {
+
+ /**
+ * Returns a String containing details about the cluster (NodeManagers, available memory, ...)
+ *
+ */
+ String getClusterDescription() throws Exception;
+
+ /**
+ * Triggers deployment of a cluster
+ * @return Client for the cluster
+ * @throws Exception
+ */
+ ClientType deploy() throws Exception;
+}