You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:08 UTC
[49/52] [abbrv] flink git commit: [FLINK-4930] [client,
yarn] Implement FLIP-6 YARN client
[FLINK-4930] [client, yarn] Implement FLIP-6 YARN client
Summary: Implement FLIP-6 YARN client
Test Plan: NA
Reviewers: biao.liub
Differential Revision: http://phabricator.taobao.net/D6563
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3695a8e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3695a8e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3695a8e9
Branch: refs/heads/master
Commit: 3695a8e92e9c3deee368d9cc3ce89a5ab117d6a1
Parents: 2a7dbda
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Nov 23 17:19:35 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 1 +
.../yarn/AbstractYarnClusterDescriptor.java | 101 +++++---
.../apache/flink/yarn/YarnClusterClientV2.java | 169 +++++++++++++
.../flink/yarn/YarnClusterDescriptorV2.java | 34 +++
.../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 253 +++++++++++++++++++
5 files changed, 524 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 1ec0674..dc3280e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -131,6 +131,7 @@ public class CliFrontend {
/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
+ loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn");
customCommandLine.add(new DefaultCLI());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ca18439..b4c87b8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.conf.Configuration;
@@ -60,6 +61,8 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.io.FileOutputStream;
+import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
@@ -460,28 +463,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}
- // ------------------ Set default file system scheme -------------------------
-
- try {
- org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
- } catch (IOException e) {
- throw new IOException("Error while setting the default " +
- "filesystem scheme from configuration.", e);
- }
-
- // initialize file system
- // Copy the application master jar to the filesystem
- // Create a local resource to point to the destination jar path
- final FileSystem fs = FileSystem.get(conf);
-
- // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
- if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
- fs.getScheme().startsWith("file")) {
- LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
- + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
- + "The Flink YARN client needs to store its files in a distributed file system");
- }
-
// ------------------ Check if the YARN ClusterClient has the requested resources --------------
// the yarnMinAllocationMB specifies the smallest possible container allocation size.
@@ -505,6 +486,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+ ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Resource maxRes = appResponse.getMaximumResourceCapability();
final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
@@ -560,6 +542,45 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
+ ApplicationReport report = startAppMaster(null, yarnClient);
+
+ String host = report.getHost();
+ int port = report.getRpcPort();
+
+ // Correctly initialize the Flink config
+ flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
+ flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+ // the Flink cluster is deployed in YARN. Represent cluster
+ return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+ }
+
+ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient) throws Exception {
+
+ // ------------------ Set default file system scheme -------------------------
+
+ try {
+ org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
+ }
+
+ // initialize file system
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ final FileSystem fs = FileSystem.get(conf);
+
+ // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+ if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+ fs.getScheme().startsWith("file")) {
+ LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ + "The Flink YARN client needs to store its files in a distributed file system");
+ }
+
+ final YarnClientApplication yarnApplication = yarnClient.createApplication();
+ ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
effectiveShipFiles.add(file.getAbsoluteFile());
@@ -596,8 +617,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
effectiveShipFiles.addAll(userJarFiles);
}
+
// Set-up ApplicationSubmissionContext for the application
- ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
final ApplicationId appId = appContext.getApplicationId();
@@ -694,6 +715,27 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
+ // write job graph to tmp file and add it to local resource
+ // TODO: need refine ?
+ if (jobGraph != null) {
+ try {
+ File fp = new File("/tmp/jobgraph-" + appId.toString());
+ FileOutputStream input = new FileOutputStream(fp);
+ ObjectOutputStream obInput = new ObjectOutputStream(input);
+ obInput.writeObject(jobGraph);
+ input.close();
+ LocalResource jobgraph = Records.newRecord(LocalResource.class);
+ Path remoteJobGraph =
+ Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
+ localResources.put("job.graph", jobgraph);
+ paths.add(remoteJobGraph);
+ classPathBuilder.append("job.graph").append(File.pathSeparator);
+ } catch (Exception e) {
+ LOG.warn("Add job graph to local resource fail");
+ throw e;
+ }
+ }
+
sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
@@ -835,7 +877,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.debug("Application State: {}", appState);
switch(appState) {
case FAILED:
- case FINISHED:
+ case FINISHED: //TODO: the finished state may be valid in flip-6
case KILLED:
throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+ appState + " during deployment. \n" +
@@ -871,16 +913,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
} catch (IllegalStateException e) {
// we're already in the shut down hook.
}
-
- String host = report.getHost();
- int port = report.getRpcPort();
-
- // Correctly initialize the Flink config
- flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
- flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
-
- // the Flink cluster is deployed in YARN. Represent cluster
- return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+ return report;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
new file mode 100644
index 0000000..daa2c3b
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Java representation of a running Flink job on YARN.
+ * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster,
+ * so this class will be used as a client to communicate with yarn and start the job on yarn.
+ */
+public class YarnClusterClientV2 extends ClusterClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientV2.class);
+
+ private YarnClient yarnClient;
+
+ private final AbstractYarnClusterDescriptor clusterDescriptor;
+
+ private ApplicationId appId;
+
+ private String trackingURL;
+
+ /**
+ * Create a client to communicate with YARN cluster.
+ *
+ * @param clusterDescriptor The descriptor used to create yarn job
+ * @param flinkConfig Flink configuration
+ * @throws java.io.IOException
+ */
+ public YarnClusterClientV2(
+ final AbstractYarnClusterDescriptor clusterDescriptor,
+ org.apache.flink.configuration.Configuration flinkConfig) throws IOException {
+
+ super(flinkConfig);
+
+ this.clusterDescriptor = clusterDescriptor;
+ this.yarnClient = clusterDescriptor.getYarnClient();
+ this.trackingURL = "";
+ }
+
+ @Override
+ public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+ return flinkConfig;
+ }
+
+ @Override
+ public int getMaxSlots() {
+ // Now need not set max slot
+ return 0;
+ }
+
+ @Override
+ public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+ return clusterDescriptor.hasUserJarFiles(userJarFiles);
+ }
+
+ @Override
+ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+ try {
+ // Create application via yarnClient
+ ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient);
+ if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
+ appId = report.getApplicationId();
+ trackingURL = report.getTrackingUrl();
+ logAndSysout("Please refer to " + getWebInterfaceURL()
+ + " for the running status of job " + jobGraph.getJobID().toString());
+ //TODO: not support attach mode now
+ return new JobSubmissionResult(jobGraph.getJobID());
+ }
+ else {
+ throw new ProgramInvocationException("Fail to submit the job.");
+ }
+ }
+ catch (Exception e) {
+ throw new ProgramInvocationException("Fail to submit the job", e.getCause());
+ }
+ }
+
+ @Override
+ public String getWebInterfaceURL() {
+ // there seems to be a difference between HD 2.2.0 and 2.6.0
+ if(!trackingURL.startsWith("http://")) {
+ return "http://" + trackingURL;
+ } else {
+ return trackingURL;
+ }
+ }
+
+ @Override
+ public String getClusterIdentifier() {
+ return "Yarn cluster with application id " + getApplicationId();
+ }
+
+ /**
+ * This method is only available if the cluster hasn't been started in detached mode.
+ */
+ @Override
+ public GetClusterStatusResponse getClusterStatus() {
+ throw new UnsupportedOperationException("Not support getClusterStatus since Flip-6.");
+ }
+
+ public ApplicationStatus getApplicationStatus() {
+ //TODO: this method is useful for later
+ return null;
+ }
+
+ @Override
+ public List<String> getNewMessages() {
+ throw new UnsupportedOperationException("Not support getNewMessages since Flip-6.");
+ }
+
+ @Override
+ public void finalizeCluster() {
+ throw new UnsupportedOperationException("Not support finalizeCluster since Flip-6.");
+ }
+
+ @Override
+ public boolean isDetached() {
+ return super.isDetached() || clusterDescriptor.isDetachedMode();
+ }
+
+ @Override
+ public void waitForClusterToBeReady() {
+ throw new UnsupportedOperationException("Not support waitForClusterToBeReady since Flip-6.");
+ }
+
+ @Override
+ public InetSocketAddress getJobManagerAddress() {
+ //TODO: just return a local address in order to be compatible with createClient in CliFrontend
+ return new InetSocketAddress("localhost", 0);
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
new file mode 100644
index 0000000..e3bd944
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+
+/**
+ * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
+ * This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
+ * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>.
+ * However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now.
+ */
+public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
+
+ @Override
+ protected Class<?> getApplicationMasterClass() {
+ return YarnFlinkApplicationMasterRunner.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
new file mode 100644
index 0000000..ca5049c
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnClusterClientV2;
+import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
+/**
+ * Class handling the command line interface to the YARN per job mode under flip-6.
+ */
+public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class);
+
+ /** The id for the CommandLine interface */
+ private static final String ID = "yarn";
+
+ private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
+
+ //------------------------------------ Command Line argument options -------------------------
+ // the prefix transformation is used by the CliFrontend static constructor.
+ private final Option QUEUE;
+ private final Option SHIP_PATH;
+ private final Option FLINK_JAR;
+ private final Option JM_MEMORY;
+ private final Option DETACHED;
+ private final Option ZOOKEEPER_NAMESPACE;
+
+ private final Options ALL_OPTIONS;
+
+ /**
+ * Dynamic properties allow the user to specify additional configuration values with -D, such as
+ * -D fs.overwrite-files=true -D taskmanager.network.numberOfBuffers=16368
+ */
+ private final Option DYNAMIC_PROPERTIES;
+
+ //------------------------------------ Internal fields -------------------------
+ // use detach mode as default
+ private boolean detachedMode = true;
+
+ public FlinkYarnCLI(String shortPrefix, String longPrefix) {
+
+ QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
+ SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
+ FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
+ JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+ DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
+ DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
+ ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
+
+ ALL_OPTIONS = new Options();
+ ALL_OPTIONS.addOption(FLINK_JAR);
+ ALL_OPTIONS.addOption(JM_MEMORY);
+ ALL_OPTIONS.addOption(QUEUE);
+ ALL_OPTIONS.addOption(SHIP_PATH);
+ ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
+ ALL_OPTIONS.addOption(DETACHED);
+ ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE);
+ }
+
+ public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) {
+
+ YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2();
+
+ // Jar Path
+ Path localJarPath;
+ if (cmd.hasOption(FLINK_JAR.getOpt())) {
+ String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+ if (!userPath.startsWith("file://")) {
+ userPath = "file://" + userPath;
+ }
+ localJarPath = new Path(userPath);
+ } else {
+ LOG.info("No path for the flink jar passed. Using the location of "
+ + yarnClusterDescriptor.getClass() + " to locate the jar");
+ String encodedJarPath =
+ yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ try {
+ // we have to decode the url encoded parts of the path
+ String decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+ localJarPath = new Path(new File(decodedPath).toURI());
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+ " Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option.");
+ }
+ }
+
+ yarnClusterDescriptor.setLocalJarPath(localJarPath);
+
+ List<File> shipFiles = new ArrayList<>();
+ // path to directory to ship
+ if (cmd.hasOption(SHIP_PATH.getOpt())) {
+ String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+ File shipDir = new File(shipPath);
+ if (shipDir.isDirectory()) {
+ shipFiles.add(shipDir);
+ } else {
+ LOG.warn("Ship directory is not a directory. Ignoring it.");
+ }
+ }
+
+ yarnClusterDescriptor.addShipFiles(shipFiles);
+
+ // queue
+ if (cmd.hasOption(QUEUE.getOpt())) {
+ yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+ }
+
+ // JobManager Memory
+ if (cmd.hasOption(JM_MEMORY.getOpt())) {
+ int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+ yarnClusterDescriptor.setJobManagerMemory(jmMemory);
+ }
+
+ String[] dynamicProperties = null;
+ if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+ dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+ }
+ String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+ yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+
+ if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+ // TODO: not support non detach mode now.
+ //this.detachedMode = false;
+ }
+ yarnClusterDescriptor.setDetachedMode(this.detachedMode);
+
+ if(defaultApplicationName != null) {
+ yarnClusterDescriptor.setName(defaultApplicationName);
+ }
+
+ if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) {
+ String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt());
+ yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
+ }
+
+ return yarnClusterDescriptor;
+ }
+
+ private void printUsage() {
+ System.out.println("Usage:");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setWidth(200);
+ formatter.setLeftPadding(5);
+
+ formatter.setSyntaxPrefix(" Optional");
+ Options options = new Options();
+ addGeneralOptions(options);
+ addRunOptions(options);
+ formatter.printHelp(" ", options);
+ }
+
+ @Override
+ public boolean isActive(CommandLine commandLine, Configuration configuration) {
+ String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+ boolean yarnJobManager = ID.equals(jobManagerOption);
+ return yarnJobManager;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public void addRunOptions(Options baseOptions) {
+ for (Object option : ALL_OPTIONS.getOptions()) {
+ baseOptions.addOption((Option) option);
+ }
+ }
+
+ @Override
+ public void addGeneralOptions(Options baseOptions) {
+ }
+
+ @Override
+ public YarnClusterClientV2 retrieveCluster(
+ CommandLine cmdLine,
+ Configuration config) throws UnsupportedOperationException {
+
+ throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6.");
+ }
+
+ @Override
+ public YarnClusterClientV2 createCluster(
+ String applicationName,
+ CommandLine cmdLine,
+ Configuration config,
+ List<URL> userJarFiles) {
+ Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
+
+ YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
+ yarnClusterDescriptor.setFlinkConfiguration(config);
+ yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
+
+ YarnClusterClientV2 client = null;
+ try {
+ client = new YarnClusterClientV2(yarnClusterDescriptor, config);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Fail to create YarnClusterClientV2", e.getCause());
+ }
+ return client;
+
+ }
+
+ /**
+ * Utility method
+ */
+ private void logAndSysout(String message) {
+ LOG.info(message);
+ System.out.println(message);
+ }
+
+}