You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:22 UTC
[08/10] flink git commit: [FLINK-3937] programmatic resuming of
clusters
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index a5b8af7..9130fdd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -55,6 +56,7 @@ import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -77,9 +79,6 @@ public class YarnClusterClient extends ClusterClient {
// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
private final Path sessionFilesDir;
- /** The leader retrieval service for connecting to the cluster and finding the active leader. */
- private final LeaderRetrievalService leaderRetrievalService;
-
//---------- Class internal fields -------------------
private final AbstractYarnClusterDescriptor clusterDescriptor;
@@ -92,6 +91,7 @@ public class YarnClusterClient extends ClusterClient {
private boolean isConnected = false;
+ private final boolean perJobCluster;
/**
* Create a new Flink on YARN cluster.
@@ -101,6 +101,7 @@ public class YarnClusterClient extends ClusterClient {
* @param appReport the YARN application ID
* @param flinkConfig Flink configuration
* @param sessionFilesDir Location of files required for YARN session
+ * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown
* @throws IOException
* @throws YarnException
*/
@@ -109,7 +110,8 @@ public class YarnClusterClient extends ClusterClient {
final YarnClient yarnClient,
final ApplicationReport appReport,
org.apache.flink.configuration.Configuration flinkConfig,
- Path sessionFilesDir) throws IOException, YarnException {
+ Path sessionFilesDir,
+ boolean perJobCluster) throws IOException, YarnException {
super(flinkConfig);
@@ -122,18 +124,16 @@ public class YarnClusterClient extends ClusterClient {
this.applicationId = appReport;
this.appId = appReport.getApplicationId();
this.trackingURL = appReport.getTrackingUrl();
+ this.perJobCluster = perJobCluster;
+ /* The leader retrieval service for connecting to the cluster and finding the active leader. */
+ LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
} catch (Exception e) {
throw new IOException("Could not create the leader retrieval service.", e);
}
-
- if (isConnected) {
- throw new IllegalStateException("Already connected to the cluster.");
- }
-
// start application client
LOG.info("Start application client.");
@@ -182,28 +182,31 @@ public class YarnClusterClient extends ClusterClient {
isConnected = true;
- logAndSysout("Waiting until all TaskManagers have connected");
+ if (perJobCluster) {
- while(true) {
- GetClusterStatusResponse status = getClusterStatus();
- if (status != null) {
- if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
- logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
- + clusterDescriptor.getTaskManagerCount() + ")");
+ logAndSysout("Waiting until all TaskManagers have connected");
+
+ while (true) {
+ GetClusterStatusResponse status = getClusterStatus();
+ if (status != null) {
+ if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
+ logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
+ + clusterDescriptor.getTaskManagerCount() + ")");
+ } else {
+ logAndSysout("All TaskManagers are connected");
+ break;
+ }
} else {
- logAndSysout("All TaskManagers are connected");
- break;
+ logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
- } else {
- logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for TaskManagers");
- System.err.println("Thread is interrupted");
- throw new IOException("Interrupted while waiting for TaskManagers", e);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for TaskManagers");
+ System.err.println("Thread is interrupted");
+ throw new IOException("Interrupted while waiting for TaskManagers", e);
+ }
}
}
}
@@ -214,9 +217,12 @@ public class YarnClusterClient extends ClusterClient {
}
LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
- if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) {
- LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally");
+ try {
+ Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+ } catch (IllegalStateException e) {
+ // we are already in the shutdown hook
}
+
// tell the actor to shut down.
applicationClient.tell(PoisonPill.getInstance(), applicationClient);
@@ -265,12 +271,30 @@ public class YarnClusterClient extends ClusterClient {
@Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
- if (isDetached()) {
- JobSubmissionResult result = super.runDetached(jobGraph, classLoader);
+ if (perJobCluster) {
stopAfterJob(jobGraph.getJobID());
- return result;
+ }
+
+ if (isDetached()) {
+ return super.runDetached(jobGraph, classLoader);
} else {
- return super.run(jobGraph, classLoader);
+ try {
+ return super.run(jobGraph, classLoader);
+ } finally {
+ // show cluster status
+ List<String> msgs = getNewMessages();
+ if (msgs != null && msgs.size() > 1) {
+
+ logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+ for (String msg : msgs) {
+ logAndSysout(msg);
+ }
+ }
+ if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
+ logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
+ logAndSysout("YARN Diagnostics: " + getDiagnostics());
+ }
+ }
}
}
@@ -298,8 +322,9 @@ public class YarnClusterClient extends ClusterClient {
throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
}
if(hasBeenShutdown()) {
- throw new RuntimeException("The YarnClusterClient has already been stopped");
+ return null;
}
+
Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
Object clusterStatus;
try {
@@ -417,32 +442,20 @@ public class YarnClusterClient extends ClusterClient {
@Override
public void finalizeCluster() {
- if (!isConnected) {
- throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
- }
-
- if (isDetached()) {
- // only disconnect if we are running detached
+ if (isDetached() || !perJobCluster) {
+ // only disconnect if we are not running a per job cluster
disconnect();
- return;
+ } else {
+ shutdownCluster();
}
+ }
- // show cluster status
-
- List<String> msgs = getNewMessages();
- if (msgs != null && msgs.size() > 1) {
+ public void shutdownCluster() {
- logAndSysout("The following messages were created by the YARN cluster while running the Job:");
- for (String msg : msgs) {
- logAndSysout(msg);
- }
- }
- if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
- logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
- logAndSysout("YARN Diagnostics: " + getDiagnostics());
+ if (!isConnected) {
+ throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
}
-
if(hasBeenShutDown.getAndSet(true)) {
return;
}
@@ -471,13 +484,30 @@ public class YarnClusterClient extends ClusterClient {
actorSystem.awaitTermination();
}
- LOG.info("Deleting files in " + sessionFilesDir);
try {
- FileSystem shutFS = FileSystem.get(hadoopConfig);
- shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
- shutFS.close();
- }catch(IOException e){
- LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
+ File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig);
+ if (propertiesFile.isFile()) {
+ if (propertiesFile.delete()) {
+ LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
+ } else {
+ LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while deleting the JobManager address file", e);
+ }
+
+ if (sessionFilesDir != null) {
+ LOG.info("Deleting files in " + sessionFilesDir);
+ try {
+ FileSystem shutFS = FileSystem.get(hadoopConfig);
+ shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+ shutFS.close();
+ } catch (IOException e) {
+ LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
+ }
+ } else {
+ LOG.warn("Session file directory not set. Not deleting session files");
}
try {
@@ -571,7 +601,6 @@ public class YarnClusterClient extends ClusterClient {
@Override
public boolean isDetached() {
- // either we have set detached mode using the general '-d' flag or using the Yarn CLI flag 'yd'
return super.isDetached() || clusterDescriptor.isDetachedMode();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 43e7c7b..5f745b2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -17,10 +17,12 @@
*/
package org.apache.flink.yarn;
+
/**
* Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
*/
public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
@Override
protected Class<?> getApplicationMasterClass() {
return YarnApplicationMasterRunner.class;
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fdcc858..5eca4f1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -28,11 +28,9 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -59,6 +57,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
/**
* Class handling the command line interface to the YARN session.
*/
@@ -97,8 +97,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
private final Option CONTAINER;
private final Option SLOTS;
private final Option DETACHED;
+ @Deprecated
private final Option STREAMING;
private final Option NAME;
+
+ private final Options ALL_OPTIONS;
/**
* Dynamic properties allow the user to specify additional configuration values with -D, such as
@@ -118,7 +121,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
this.acceptInteractiveInput = acceptInteractiveInput;
-
+
QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
@@ -132,37 +135,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
+
+ ALL_OPTIONS = new Options();
+ ALL_OPTIONS.addOption(FLINK_JAR);
+ ALL_OPTIONS.addOption(JM_MEMORY);
+ ALL_OPTIONS.addOption(TM_MEMORY);
+ ALL_OPTIONS.addOption(CONTAINER);
+ ALL_OPTIONS.addOption(QUEUE);
+ ALL_OPTIONS.addOption(QUERY);
+ ALL_OPTIONS.addOption(SHIP_PATH);
+ ALL_OPTIONS.addOption(SLOTS);
+ ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
+ ALL_OPTIONS.addOption(DETACHED);
+ ALL_OPTIONS.addOption(STREAMING);
+ ALL_OPTIONS.addOption(NAME);
+ ALL_OPTIONS.addOption(APPLICATION_ID);
}
- /**
- * Attaches a new Yarn Client to running YARN application.
- *
- */
- public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) {
- AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
- if (flinkYarnClient == null) {
- return null;
- }
- if (!cmd.hasOption(APPLICATION_ID.getOpt())) {
- LOG.error("Missing required argument " + APPLICATION_ID.getOpt());
- printUsage();
- return null;
- }
-
- String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
- GlobalConfiguration.loadConfiguration(confDirPath);
- Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
- flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
- flinkYarnClient.setConfigurationDirectory(confDirPath);
-
- try {
- return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt()));
- } catch (Exception e) {
- LOG.error("Could not attach to YARN session", e);
- return null;
- }
- }
/**
* Resumes from a Flink Yarn properties file
* @param flinkConfiguration The flink configuration
@@ -170,7 +160,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
*/
private boolean resumeFromYarnProperties(Configuration flinkConfiguration) {
// load the YARN properties
- File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration));
+ File propertiesFile = getYarnPropertiesLocation(flinkConfiguration);
if (!propertiesFile.exists()) {
return false;
}
@@ -209,7 +199,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
try {
jobManagerAddress = ClientUtils.parseHostPortAddress(address);
// store address in config from where it is retrieved by the retrieval service
- CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress);
+ CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
}
catch (Exception e) {
throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e);
@@ -228,10 +218,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return true;
}
- public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
-
+ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
- YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor();
+ AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
LOG.error("Missing required argument {}", CONTAINER.getOpt());
@@ -343,19 +332,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return yarnClusterDescriptor;
}
- @Override
- public YarnClusterClient createClient(String applicationName, CommandLine cmdLine) throws Exception {
-
- YarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
-
- try {
- return yarnClusterDescriptor.deploy();
- } catch (Exception e) {
- throw new RuntimeException("Error deploying the YARN cluster", e);
- }
-
- }
-
private void printUsage() {
System.out.println("Usage:");
HelpFormatter formatter = new HelpFormatter();
@@ -367,17 +343,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
formatter.printHelp(" ", req);
formatter.setSyntaxPrefix(" Optional");
- Options opt = new Options();
- opt.addOption(JM_MEMORY);
- opt.addOption(TM_MEMORY);
- opt.addOption(QUERY);
- opt.addOption(QUEUE);
- opt.addOption(SLOTS);
- opt.addOption(DYNAMIC_PROPERTIES);
- opt.addOption(DETACHED);
- opt.addOption(STREAMING);
- opt.addOption(NAME);
- formatter.printHelp(" ", opt);
+ Options options = new Options();
+ addGeneralOptions(options);
+ addRunOptions(options);
+ formatter.printHelp(" ", options);
}
private static void writeYarnProperties(Properties properties, File propertiesFile) {
@@ -439,6 +408,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
switch (command) {
case "quit":
case "stop":
+ yarnCluster.shutdownCluster();
break label;
case "help":
@@ -466,38 +436,62 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
@Override
- public String getIdentifier() {
+ public boolean isActive(CommandLine commandLine, Configuration configuration) {
+ String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+ boolean yarnJobManager = ID.equals(jobManagerOption);
+ return yarnJobManager || resumeFromYarnProperties(configuration);
+ }
+
+ @Override
+ public String getId() {
return ID;
}
- public void addOptions(Options options) {
- options.addOption(FLINK_JAR);
- options.addOption(JM_MEMORY);
- options.addOption(TM_MEMORY);
- options.addOption(CONTAINER);
- options.addOption(QUEUE);
- options.addOption(QUERY);
- options.addOption(SHIP_PATH);
- options.addOption(SLOTS);
- options.addOption(DYNAMIC_PROPERTIES);
- options.addOption(DETACHED);
- options.addOption(STREAMING);
- options.addOption(NAME);
+ @Override
+ public void addRunOptions(Options baseOptions) {
+ for (Object option : ALL_OPTIONS.getOptions()) {
+ baseOptions.addOption((Option) option);
+ }
}
+ @Override
+ public void addGeneralOptions(Options baseOptions) {
+ baseOptions.addOption(APPLICATION_ID);
+ }
- public void getYARNAttachCLIOptions(Options options) {
- options.addOption(APPLICATION_ID);
+ @Override
+ public YarnClusterClient retrieveCluster(
+ CommandLine cmdLine,
+ Configuration config) throws UnsupportedOperationException {
+
+ // first check for an application id
+ if (cmdLine.hasOption(APPLICATION_ID.getOpt())) {
+ String applicationID = cmdLine.getOptionValue(APPLICATION_ID.getOpt());
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+ yarnDescriptor.setFlinkConfiguration(config);
+ return yarnDescriptor.retrieve(applicationID);
+ // then try to load from yarn properties
+ } else if (resumeFromYarnProperties(config)) {
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+ yarnDescriptor.setFlinkConfiguration(config);
+ return yarnDescriptor.retrieveFromConfig(config);
+ }
+
+ throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
}
@Override
- public ClusterClient retrieveCluster(Configuration config) throws Exception {
+ public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) {
+
+ AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
+ yarnClusterDescriptor.setFlinkConfiguration(config);
- if(resumeFromYarnProperties(config)) {
- return new StandaloneClusterClient(config);
+ try {
+ return yarnClusterDescriptor.deploy();
+ } catch (Exception e) {
+ throw new RuntimeException("Error deploying the YARN cluster", e);
}
- return null;
}
public int run(String[] args) {
@@ -505,7 +499,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// Command Line Options
//
Options options = new Options();
- addOptions(options);
+ addGeneralOptions(options);
+ addRunOptions(options);
CommandLineParser parser = new PosixParser();
CommandLine cmd;
@@ -519,10 +514,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// Query cluster for metrics
if (cmd.hasOption(QUERY.getOpt())) {
- YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
String description;
try {
- description = flinkYarnClient.getClusterDescription();
+ description = yarnDescriptor.getClusterDescription();
} catch (Exception e) {
System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
e.printStackTrace(System.err);
@@ -531,56 +526,61 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
System.out.println(description);
return 0;
} else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
- yarnCluster = attachFlinkYarnClient(cmd);
+
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+ try {
+ yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt()));
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve existing Yarn application", e);
+ }
if (detachedMode) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill "+yarnCluster.getApplicationId());
+ "yarn application -kill "+yarnCluster.getClusterIdentifier());
+ yarnCluster.disconnect();
} else {
- runInteractiveCli(yarnCluster);
-
- if (!yarnCluster.hasBeenStopped()) {
- LOG.info("Command Line Interface requested session shutdown");
- yarnCluster.shutdown(false);
- }
+ runInteractiveCli(yarnCluster, true);
}
} else {
- YarnClusterDescriptor flinkYarnClient;
+ AbstractYarnClusterDescriptor yarnDescriptor;
try {
- flinkYarnClient = createDescriptor(null, cmd);
+ yarnDescriptor = createDescriptor(null, cmd);
} catch (Exception e) {
System.err.println("Error while starting the YARN Client. Please check log output!");
return 1;
}
try {
- yarnCluster = flinkYarnClient.deploy();
+ yarnCluster = yarnDescriptor.deploy();
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
e.printStackTrace(System.err);
return 1;
}
//------------------ ClusterClient deployed, handle connection details
- String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
+ String jobManagerAddress =
+ yarnCluster.getJobManagerAddress().getAddress().getHostAddress() +
+ ":" + yarnCluster.getJobManagerAddress().getPort();
+
System.out.println("Flink JobManager is now running on " + jobManagerAddress);
System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
// file that we write into the conf/ dir containing the jobManager address and the dop.
- File yarnPropertiesFile = new File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
+ File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
Properties yarnProps = new Properties();
yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
- if (flinkYarnClient.getTaskManagerSlots() != -1) {
+ if (yarnDescriptor.getTaskManagerSlots() != -1) {
String parallelism =
- Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
+ Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
}
// add dynamic properties
- if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
+ if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
- flinkYarnClient.getDynamicPropertiesEncoded());
+ yarnDescriptor.getDynamicPropertiesEncoded());
}
writeYarnProperties(yarnProps, yarnPropertiesFile);
@@ -592,21 +592,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
- flinkYarnClient.getSessionFilesDir());
+ yarnDescriptor.getSessionFilesDir());
yarnCluster.disconnect();
} else {
runInteractiveCli(yarnCluster, acceptInteractiveInput);
-
- if (!yarnCluster.hasBeenShutdown()) {
- LOG.info("Command Line Interface requested session shutdown");
- yarnCluster.shutdown();
- }
-
- try {
- yarnPropertiesFile.delete();
- } catch (Exception e) {
- LOG.warn("Exception while deleting the JobManager address file", e);
- }
}
}
return 0;
@@ -649,11 +638,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
}
- private static String getYarnPropertiesLocation(Configuration conf) {
+ public static File getYarnPropertiesLocation(Configuration conf) {
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
- String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+ String propertiesFileLocation =
+ conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+
+ return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
+ }
- return propertiesFileLocation + File.separator + YARN_PROPERTIES_FILE + currentUser;
+ protected AbstractYarnClusterDescriptor getClusterDescriptor() {
+ return new YarnClusterDescriptor();
}
}