You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:08 UTC

[49/52] [abbrv] flink git commit: [FLINK-4930] [client, yarn] Implement FLIP-6 YARN client

[FLINK-4930] [client, yarn] Implement FLIP-6 YARN client

Summary: Implement FLIP-6 YARN client

Test Plan: NA

Reviewers: biao.liub

Differential Revision: http://phabricator.taobao.net/D6563


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3695a8e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3695a8e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3695a8e9

Branch: refs/heads/master
Commit: 3695a8e92e9c3deee368d9cc3ce89a5ab117d6a1
Parents: 2a7dbda
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Nov 23 17:19:35 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   1 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 101 +++++---
 .../apache/flink/yarn/YarnClusterClientV2.java  | 169 +++++++++++++
 .../flink/yarn/YarnClusterDescriptorV2.java     |  34 +++
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 253 +++++++++++++++++++
 5 files changed, 524 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 1ec0674..dc3280e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -131,6 +131,7 @@ public class CliFrontend {
 		/** command line interface of the YARN session, with a special initialization here
 		 *  to prefix all options with y/yarn. */
 		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
+		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn");
 		customCommandLine.add(new DefaultCLI());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ca18439..b4c87b8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +61,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.FileOutputStream;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
@@ -460,28 +463,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
 		}
 
-		// ------------------ Set default file system scheme -------------------------
-
-		try {
-			org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
-		} catch (IOException e) {
-			throw new IOException("Error while setting the default " +
-				"filesystem scheme from configuration.", e);
-		}
-
-		// initialize file system
-		// Copy the application master jar to the filesystem
-		// Create a local resource to point to the destination jar path
-		final FileSystem fs = FileSystem.get(conf);
-
-		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
-		if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
-			fs.getScheme().startsWith("file")) {
-			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
-				+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
-				+ "The Flink YARN client needs to store its files in a distributed file system");
-		}
-
 		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
 
 		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
@@ -505,6 +486,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// Create application via yarnClient
 		final YarnClientApplication yarnApplication = yarnClient.createApplication();
 		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 
 		Resource maxRes = appResponse.getMaximumResourceCapability();
 		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
@@ -560,6 +542,45 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
+		ApplicationReport report = startAppMaster(null, yarnClient);
+
+		String host = report.getHost();
+		int port = report.getRpcPort();
+
+		// Correctly initialize the Flink config
+		flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
+		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+		// the Flink cluster is deployed in YARN. Represent cluster
+		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+	}
+
+	public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient) throws Exception {
+
+		// ------------------ Set default file system scheme -------------------------
+
+		try {
+			org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+		} catch (IOException e) {
+			throw new IOException("Error while setting the default " +
+					"filesystem scheme from configuration.", e);
+		}
+
+		// initialize file system
+		// Copy the application master jar to the filesystem
+		// Create a local resource to point to the destination jar path
+		final FileSystem fs = FileSystem.get(conf);
+
+		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+		if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+				fs.getScheme().startsWith("file")) {
+			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+					+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+					+ "The Flink YARN client needs to store its files in a distributed file system");
+		}
+
+		final YarnClientApplication yarnApplication = yarnClient.createApplication();
+		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 		Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
 		for (File file : shipFiles) {
 			effectiveShipFiles.add(file.getAbsoluteFile());
@@ -596,8 +617,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			effectiveShipFiles.addAll(userJarFiles);
 		}
 
+
 		// Set-up ApplicationSubmissionContext for the application
-		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 
 		final ApplicationId appId = appContext.getApplicationId();
 
@@ -694,6 +715,27 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		paths.add(remotePathConf);
 		classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
 
+		// write job graph to tmp file and add it to local resource 
+		// TODO: need refine ?
+		if (jobGraph != null) {
+			try {
+				File fp = new File("/tmp/jobgraph-" + appId.toString());
+				FileOutputStream input = new FileOutputStream(fp);
+				ObjectOutputStream obInput = new ObjectOutputStream(input);
+				obInput.writeObject(jobGraph);
+				input.close();
+				LocalResource jobgraph = Records.newRecord(LocalResource.class);
+				Path remoteJobGraph =
+						Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
+				localResources.put("job.graph", jobgraph);
+				paths.add(remoteJobGraph);
+				classPathBuilder.append("job.graph").append(File.pathSeparator);
+			} catch (Exception e) {
+				LOG.warn("Add job graph to local resource fail");
+				throw e;
+			}
+		}
+
 		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
 
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
@@ -835,7 +877,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			LOG.debug("Application State: {}", appState);
 			switch(appState) {
 				case FAILED:
-				case FINISHED:
+				case FINISHED: //TODO: the finished state may be valid in flip-6
 				case KILLED:
 					throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
 						+ appState + " during deployment. \n" +
@@ -871,16 +913,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		} catch (IllegalStateException e) {
 			// we're already in the shut down hook.
 		}
-
-		String host = report.getHost();
-		int port = report.getRpcPort();
-
-		// Correctly initialize the Flink config
-		flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
-		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
-
-		// the Flink cluster is deployed in YARN. Represent cluster
-		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+		return report;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
new file mode 100644
index 0000000..daa2c3b
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Java representation of a running Flink job on YARN.
+ * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster, 
+ * so this class will be used as a client to communicate with yarn and start the job on yarn.
+ */
+public class YarnClusterClientV2 extends ClusterClient {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientV2.class);
+
+	private YarnClient yarnClient;
+
+	private final AbstractYarnClusterDescriptor clusterDescriptor;
+
+	private ApplicationId appId;
+
+	private String trackingURL;
+
+	/**
+	 * Create a client to communicate with YARN cluster.
+	 *
+	 * @param clusterDescriptor The descriptor used to create yarn job
+	 * @param flinkConfig Flink configuration
+	 * @throws java.io.IOException
+	 */
+	public YarnClusterClientV2(
+			final AbstractYarnClusterDescriptor clusterDescriptor,
+			org.apache.flink.configuration.Configuration flinkConfig) throws IOException {
+
+		super(flinkConfig);
+
+		this.clusterDescriptor = clusterDescriptor;
+		this.yarnClient = clusterDescriptor.getYarnClient();
+		this.trackingURL = "";
+	}
+
+	@Override
+	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+		return flinkConfig;
+	}
+
+	@Override
+	public int getMaxSlots() {
+        // Now need not set max slot
+		return 0;
+	}
+
+	@Override
+	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+		return clusterDescriptor.hasUserJarFiles(userJarFiles);
+	}
+
+	@Override
+	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		try {
+			// Create application via yarnClient
+			ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient);
+			if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
+				appId = report.getApplicationId();
+				trackingURL = report.getTrackingUrl();
+				logAndSysout("Please refer to " + getWebInterfaceURL() 
+						+ " for the running status of job " +  jobGraph.getJobID().toString());
+				//TODO: not support attach mode now
+				return new JobSubmissionResult(jobGraph.getJobID());
+			}
+			else {
+				throw new ProgramInvocationException("Fail to submit the job.");
+			}
+		}
+		catch (Exception e) {
+			throw new ProgramInvocationException("Fail to submit the job", e.getCause());
+		}
+	}
+
+	@Override
+	public String getWebInterfaceURL() {
+		// there seems to be a difference between HD 2.2.0 and 2.6.0
+		if(!trackingURL.startsWith("http://")) {
+			return "http://" + trackingURL;
+		} else {
+			return trackingURL;
+		}
+	}
+
+	@Override
+	public String getClusterIdentifier() {
+		return "Yarn cluster with application id " + getApplicationId();
+	}
+
+	/**
+	 * This method is only available if the cluster hasn't been started in detached mode.
+	 */
+	@Override
+	public GetClusterStatusResponse getClusterStatus() {
+		throw new UnsupportedOperationException("Not support getClusterStatus since Flip-6.");
+	}
+
+	public ApplicationStatus getApplicationStatus() {
+		//TODO: this method is useful for later
+		return null;
+	}
+
+	@Override
+	public List<String> getNewMessages() {
+		throw new UnsupportedOperationException("Not support getNewMessages since Flip-6.");
+	}
+
+	@Override
+	public void finalizeCluster() {
+		throw new UnsupportedOperationException("Not support finalizeCluster since Flip-6.");
+	}
+
+	@Override
+	public boolean isDetached() {
+		return super.isDetached() || clusterDescriptor.isDetachedMode();
+	}
+
+	@Override
+	public void waitForClusterToBeReady() {
+		throw new UnsupportedOperationException("Not support waitForClusterToBeReady since Flip-6.");
+	}
+
+	@Override
+	public InetSocketAddress getJobManagerAddress() {
+		//TODO: just return a local address in order to be compatible with createClient in CliFrontend
+		return new InetSocketAddress("localhost", 0);
+	}
+
+	public ApplicationId getApplicationId() {
+		return appId;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
new file mode 100644
index 0000000..e3bd944
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+
+/**
+ * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
+ * This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
+ * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>.
+ * However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now.
+ */
+public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
+
+	@Override
+	protected Class<?> getApplicationMasterClass() {
+		return YarnFlinkApplicationMasterRunner.class;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
new file mode 100644
index 0000000..ca5049c
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnClusterClientV2;
+import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
+/**
+ * Class handling the command line interface to the YARN per job mode under flip-6.
+ */
+public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class);
+
+	/** The id for the CommandLine interface */
+	private static final String ID = "yarn";
+
+	private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
+
+	//------------------------------------ Command Line argument options -------------------------
+	// the prefix transformation is used by the CliFrontend static constructor.
+	private final Option QUEUE;
+	private final Option SHIP_PATH;
+	private final Option FLINK_JAR;
+	private final Option JM_MEMORY;
+	private final Option DETACHED;
+	private final Option ZOOKEEPER_NAMESPACE;
+
+	private final Options ALL_OPTIONS;
+
+	/**
+	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 *  -D fs.overwrite-files=true  -D taskmanager.network.numberOfBuffers=16368
+	 */
+	private final Option DYNAMIC_PROPERTIES;
+
+	//------------------------------------ Internal fields -------------------------
+	// use detach mode as default
+	private boolean detachedMode = true;
+
+	public FlinkYarnCLI(String shortPrefix, String longPrefix) {
+
+		QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
+		SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
+		FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
+		JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
+		DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
+		ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
+
+		ALL_OPTIONS = new Options();
+		ALL_OPTIONS.addOption(FLINK_JAR);
+		ALL_OPTIONS.addOption(JM_MEMORY);
+		ALL_OPTIONS.addOption(QUEUE);
+		ALL_OPTIONS.addOption(SHIP_PATH);
+		ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
+		ALL_OPTIONS.addOption(DETACHED);
+		ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE);
+	}
+
+	public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) {
+
+		YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2();
+
+		// Jar Path
+		Path localJarPath;
+		if (cmd.hasOption(FLINK_JAR.getOpt())) {
+			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+			if (!userPath.startsWith("file://")) {
+				userPath = "file://" + userPath;
+			}
+			localJarPath = new Path(userPath);
+		} else {
+			LOG.info("No path for the flink jar passed. Using the location of "
+				+ yarnClusterDescriptor.getClass() + " to locate the jar");
+			String encodedJarPath =
+				yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+			try {
+				// we have to decode the url encoded parts of the path
+				String decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+				localJarPath = new Path(new File(decodedPath).toURI());
+			} catch (UnsupportedEncodingException e) {
+				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+					" Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option.");
+			}
+		}
+
+		yarnClusterDescriptor.setLocalJarPath(localJarPath);
+
+		List<File> shipFiles = new ArrayList<>();
+		// path to directory to ship
+		if (cmd.hasOption(SHIP_PATH.getOpt())) {
+			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+			File shipDir = new File(shipPath);
+			if (shipDir.isDirectory()) {
+				shipFiles.add(shipDir);
+			} else {
+				LOG.warn("Ship directory is not a directory. Ignoring it.");
+			}
+		}
+
+		yarnClusterDescriptor.addShipFiles(shipFiles);
+
+		// queue
+		if (cmd.hasOption(QUEUE.getOpt())) {
+			yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+		}
+
+		// JobManager Memory
+		if (cmd.hasOption(JM_MEMORY.getOpt())) {
+			int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+			yarnClusterDescriptor.setJobManagerMemory(jmMemory);
+		}
+
+		String[] dynamicProperties = null;
+		if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+		}
+		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+
+		if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+			// TODO: not support non detach mode now.
+			//this.detachedMode = false;
+		}
+		yarnClusterDescriptor.setDetachedMode(this.detachedMode);
+
+		if(defaultApplicationName != null) {
+			yarnClusterDescriptor.setName(defaultApplicationName);
+		}
+
+		if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) {
+			String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt());
+			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
+		}
+
+		return yarnClusterDescriptor;
+	}
+
+	private void printUsage() {
+		System.out.println("Usage:");
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setWidth(200);
+		formatter.setLeftPadding(5);
+
+		formatter.setSyntaxPrefix("   Optional");
+		Options options = new Options();
+		addGeneralOptions(options);
+		addRunOptions(options);
+		formatter.printHelp(" ", options);
+	}
+
+	@Override
+	public boolean isActive(CommandLine commandLine, Configuration configuration) {
+		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+		boolean yarnJobManager = ID.equals(jobManagerOption);
+		return yarnJobManager;
+	}
+
+	@Override
+	public String getId() {
+		return ID;
+	}
+
+	@Override
+	public void addRunOptions(Options baseOptions) {
+		for (Object option : ALL_OPTIONS.getOptions()) {
+			baseOptions.addOption((Option) option);
+		}
+	}
+
+	@Override
+	public void addGeneralOptions(Options baseOptions) {
+	}
+
+	@Override
+	public YarnClusterClientV2 retrieveCluster(
+			CommandLine cmdLine,
+			Configuration config) throws UnsupportedOperationException {
+
+		throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6.");
+	}
+
+	@Override
+	public YarnClusterClientV2 createCluster(
+			String applicationName,
+			CommandLine cmdLine,
+			Configuration config,
+			List<URL> userJarFiles) {
+		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
+
+		YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
+		yarnClusterDescriptor.setFlinkConfiguration(config);
+		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
+
+		YarnClusterClientV2 client = null;
+		try {
+			client = new YarnClusterClientV2(yarnClusterDescriptor, config);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Fail to create YarnClusterClientV2", e.getCause());
+		}
+		return client;
+
+	}
+
+	/**
+	 * Utility method
+	 */
+	private void logAndSysout(String message) {
+		LOG.info(message);
+		System.out.println(message);
+	}
+
+}