You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/31 16:26:05 UTC

flink git commit: [FLINK-8330] [flip6] Remove FlinkYarnCLI

Repository: flink
Updated Branches:
  refs/heads/master 8e7a71c05 -> ce62945ae


[FLINK-8330] [flip6] Remove FlinkYarnCLI

The FlinkYarnCLI is not needed and is, thus, being removed.

This closes #5217.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce62945a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce62945a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce62945a

Branch: refs/heads/master
Commit: ce62945ae6c1b1878bebc9a528e63ef7a54cb897
Parents: 8e7a71c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 8 10:12:23 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Dec 31 17:25:38 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   7 -
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 244 -------------------
 2 files changed, 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce62945a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 07c6d65..c535783 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -128,19 +128,12 @@ public class CliFrontend {
 		//	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
 		//	      active CustomCommandLine in order and DefaultCLI isActive always return true.
 		final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
-		final String flinkYarnCLI = "org.apache.flink.yarn.cli.FlinkYarnCLI";
 		try {
 			customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, "y", "yarn"));
 		} catch (Exception e) {
 			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
 		}
 
-		try {
-			customCommandLines.add(loadCustomCommandLine(flinkYarnCLI, "y", "yarn"));
-		} catch (Exception e) {
-			LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e);
-		}
-
 		customCommandLines.add(new Flip6DefaultCLI());
 		customCommandLines.add(new DefaultCLI());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce62945a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
deleted file mode 100644
index 62ba207..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.cli;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnClusterClientV2;
-import org.apache.flink.yarn.YarnClusterDescriptorV2;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.UnsupportedEncodingException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-
-/**
- * Class handling the command line interface to the YARN per job mode under flip-6.
- */
-public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class);
-
-	/** The id for the CommandLine interface. */
-	private static final String ID = "yarn";
-
-	private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
-
-	//------------------------------------ Command Line argument options -------------------------
-	// the prefix transformation is used by the CliFrontend static constructor.
-	private final Option queue;
-	private final Option shipPath;
-	private final Option flinkJar;
-	private final Option jmMemory;
-	private final Option detached;
-	private final Option zookeeperNamespace;
-
-	private final Options allOptions;
-
-	/**
-	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>.
-	 */
-	private final Option dynamicProperties;
-
-	//------------------------------------ Internal fields -------------------------
-	// use detach mode as default
-	private boolean detachedMode = true;
-
-	public FlinkYarnCLI(String shortPrefix, String longPrefix) {
-
-		queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
-		shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
-		flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
-		jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
-		dynamicProperties = new Option(shortPrefix + "D", true, "Dynamic properties");
-		detached = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
-		zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
-
-		allOptions = new Options();
-		allOptions.addOption(flinkJar);
-		allOptions.addOption(jmMemory);
-		allOptions.addOption(queue);
-		allOptions.addOption(shipPath);
-		allOptions.addOption(dynamicProperties);
-		allOptions.addOption(detached);
-		allOptions.addOption(zookeeperNamespace);
-	}
-
-	public YarnClusterDescriptorV2 createDescriptor(
-			Configuration configuration,
-			String defaultApplicationName,
-			CommandLine cmd) {
-
-		YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2(configuration, CliFrontend.getConfigurationDirectoryFromEnv());
-
-		// Jar Path
-		Path localJarPath;
-		if (cmd.hasOption(flinkJar.getOpt())) {
-			String userPath = cmd.getOptionValue(flinkJar.getOpt());
-			if (!userPath.startsWith("file://")) {
-				userPath = "file://" + userPath;
-			}
-			localJarPath = new Path(userPath);
-		} else {
-			LOG.info("No path for the flink jar passed. Using the location of "
-				+ yarnClusterDescriptor.getClass() + " to locate the jar");
-			String encodedJarPath =
-				yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-			try {
-				// we have to decode the url encoded parts of the path
-				String decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
-				localJarPath = new Path(new File(decodedPath).toURI());
-			} catch (UnsupportedEncodingException e) {
-				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
-					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
-			}
-		}
-
-		yarnClusterDescriptor.setLocalJarPath(localJarPath);
-
-		List<File> shipFiles = new ArrayList<>();
-		// path to directory to ship
-		if (cmd.hasOption(shipPath.getOpt())) {
-			String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
-			File shipDir = new File(shipPath);
-			if (shipDir.isDirectory()) {
-				shipFiles.add(shipDir);
-			} else {
-				LOG.warn("Ship directory is not a directory. Ignoring it.");
-			}
-		}
-
-		yarnClusterDescriptor.addShipFiles(shipFiles);
-
-		// queue
-		if (cmd.hasOption(queue.getOpt())) {
-			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
-		}
-
-		String[] dynamicProperties = null;
-		if (cmd.hasOption(this.dynamicProperties.getOpt())) {
-			dynamicProperties = cmd.getOptionValues(this.dynamicProperties.getOpt());
-		}
-		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
-		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
-		if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
-			// TODO: not support non detach mode now.
-			//this.detachedMode = false;
-		}
-		yarnClusterDescriptor.setDetachedMode(this.detachedMode);
-
-		if (defaultApplicationName != null) {
-			yarnClusterDescriptor.setName(defaultApplicationName);
-		}
-
-		if (cmd.hasOption(zookeeperNamespace.getOpt())) {
-			String zookeeperNamespace = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
-			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
-		}
-
-		return yarnClusterDescriptor;
-	}
-
-	private void printUsage() {
-		System.out.println("Usage:");
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setWidth(200);
-		formatter.setLeftPadding(5);
-
-		formatter.setSyntaxPrefix("   Optional");
-		Options options = new Options();
-		addGeneralOptions(options);
-		addRunOptions(options);
-		formatter.printHelp(" ", options);
-	}
-
-	@Override
-	public boolean isActive(CommandLine commandLine, Configuration configuration) {
-		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
-		boolean yarnJobManager = ID.equals(jobManagerOption);
-		return yarnJobManager;
-	}
-
-	@Override
-	public String getId() {
-		return ID;
-	}
-
-	@Override
-	public void addRunOptions(Options baseOptions) {
-		for (Object option : allOptions.getOptions()) {
-			baseOptions.addOption((Option) option);
-		}
-	}
-
-	@Override
-	public void addGeneralOptions(Options baseOptions) {
-	}
-
-	@Override
-	public YarnClusterClientV2 retrieveCluster(
-			CommandLine cmdLine,
-			Configuration config,
-			String configurationDirectory) throws UnsupportedOperationException {
-
-		throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6.");
-	}
-
-	@Override
-	public YarnClusterClientV2 createCluster(
-			String applicationName,
-			CommandLine cmdLine,
-			Configuration config,
-			String configurationDirectory,
-			List<URL> userJarFiles) throws Exception {
-		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
-
-		YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(config, applicationName, cmdLine);
-		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
-
-		return new YarnClusterClientV2(
-			yarnClusterDescriptor,
-			config);
-	}
-
-	private void logAndSysout(String message) {
-		LOG.info(message);
-		System.out.println(message);
-	}
-
-}