You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/31 16:26:05 UTC
flink git commit: [FLINK-8330] [flip6] Remove FlinkYarnCLI
Repository: flink
Updated Branches:
refs/heads/master 8e7a71c05 -> ce62945ae
[FLINK-8330] [flip6] Remove FlinkYarnCLI
The FlinkYarnCLI is not needed and is, thus, being removed.
This closes #5217.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce62945a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce62945a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce62945a
Branch: refs/heads/master
Commit: ce62945ae6c1b1878bebc9a528e63ef7a54cb897
Parents: 8e7a71c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 8 10:12:23 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Dec 31 17:25:38 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 7 -
.../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 244 -------------------
2 files changed, 251 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce62945a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 07c6d65..c535783 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -128,19 +128,12 @@ public class CliFrontend {
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
- final String flinkYarnCLI = "org.apache.flink.yarn.cli.FlinkYarnCLI";
try {
customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, "y", "yarn"));
} catch (Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
- try {
- customCommandLines.add(loadCustomCommandLine(flinkYarnCLI, "y", "yarn"));
- } catch (Exception e) {
- LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e);
- }
-
customCommandLines.add(new Flip6DefaultCLI());
customCommandLines.add(new DefaultCLI());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce62945a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
deleted file mode 100644
index 62ba207..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.cli;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnClusterClientV2;
-import org.apache.flink.yarn.YarnClusterDescriptorV2;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.UnsupportedEncodingException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-
-/**
- * Class handling the command line interface to the YARN per job mode under flip-6.
- */
-public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class);
-
- /** The id for the CommandLine interface. */
- private static final String ID = "yarn";
-
- private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
-
- //------------------------------------ Command Line argument options -------------------------
- // the prefix transformation is used by the CliFrontend static constructor.
- private final Option queue;
- private final Option shipPath;
- private final Option flinkJar;
- private final Option jmMemory;
- private final Option detached;
- private final Option zookeeperNamespace;
-
- private final Options allOptions;
-
- /**
- * Dynamic properties allow the user to specify additional configuration values with -D, such as
- * <tt> -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624</tt>.
- */
- private final Option dynamicProperties;
-
- //------------------------------------ Internal fields -------------------------
- // use detach mode as default
- private boolean detachedMode = true;
-
- public FlinkYarnCLI(String shortPrefix, String longPrefix) {
-
- queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
- shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
- flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
- jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
- dynamicProperties = new Option(shortPrefix + "D", true, "Dynamic properties");
- detached = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
- zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
-
- allOptions = new Options();
- allOptions.addOption(flinkJar);
- allOptions.addOption(jmMemory);
- allOptions.addOption(queue);
- allOptions.addOption(shipPath);
- allOptions.addOption(dynamicProperties);
- allOptions.addOption(detached);
- allOptions.addOption(zookeeperNamespace);
- }
-
- public YarnClusterDescriptorV2 createDescriptor(
- Configuration configuration,
- String defaultApplicationName,
- CommandLine cmd) {
-
- YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2(configuration, CliFrontend.getConfigurationDirectoryFromEnv());
-
- // Jar Path
- Path localJarPath;
- if (cmd.hasOption(flinkJar.getOpt())) {
- String userPath = cmd.getOptionValue(flinkJar.getOpt());
- if (!userPath.startsWith("file://")) {
- userPath = "file://" + userPath;
- }
- localJarPath = new Path(userPath);
- } else {
- LOG.info("No path for the flink jar passed. Using the location of "
- + yarnClusterDescriptor.getClass() + " to locate the jar");
- String encodedJarPath =
- yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
- try {
- // we have to decode the url encoded parts of the path
- String decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
- localJarPath = new Path(new File(decodedPath).toURI());
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
- " Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
- }
- }
-
- yarnClusterDescriptor.setLocalJarPath(localJarPath);
-
- List<File> shipFiles = new ArrayList<>();
- // path to directory to ship
- if (cmd.hasOption(shipPath.getOpt())) {
- String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
- File shipDir = new File(shipPath);
- if (shipDir.isDirectory()) {
- shipFiles.add(shipDir);
- } else {
- LOG.warn("Ship directory is not a directory. Ignoring it.");
- }
- }
-
- yarnClusterDescriptor.addShipFiles(shipFiles);
-
- // queue
- if (cmd.hasOption(queue.getOpt())) {
- yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
- }
-
- String[] dynamicProperties = null;
- if (cmd.hasOption(this.dynamicProperties.getOpt())) {
- dynamicProperties = cmd.getOptionValues(this.dynamicProperties.getOpt());
- }
- String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
- yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
- if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
- // TODO: not support non detach mode now.
- //this.detachedMode = false;
- }
- yarnClusterDescriptor.setDetachedMode(this.detachedMode);
-
- if (defaultApplicationName != null) {
- yarnClusterDescriptor.setName(defaultApplicationName);
- }
-
- if (cmd.hasOption(zookeeperNamespace.getOpt())) {
- String zookeeperNamespace = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
- yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
- }
-
- return yarnClusterDescriptor;
- }
-
- private void printUsage() {
- System.out.println("Usage:");
- HelpFormatter formatter = new HelpFormatter();
- formatter.setWidth(200);
- formatter.setLeftPadding(5);
-
- formatter.setSyntaxPrefix(" Optional");
- Options options = new Options();
- addGeneralOptions(options);
- addRunOptions(options);
- formatter.printHelp(" ", options);
- }
-
- @Override
- public boolean isActive(CommandLine commandLine, Configuration configuration) {
- String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
- boolean yarnJobManager = ID.equals(jobManagerOption);
- return yarnJobManager;
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public void addRunOptions(Options baseOptions) {
- for (Object option : allOptions.getOptions()) {
- baseOptions.addOption((Option) option);
- }
- }
-
- @Override
- public void addGeneralOptions(Options baseOptions) {
- }
-
- @Override
- public YarnClusterClientV2 retrieveCluster(
- CommandLine cmdLine,
- Configuration config,
- String configurationDirectory) throws UnsupportedOperationException {
-
- throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6.");
- }
-
- @Override
- public YarnClusterClientV2 createCluster(
- String applicationName,
- CommandLine cmdLine,
- Configuration config,
- String configurationDirectory,
- List<URL> userJarFiles) throws Exception {
- Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
-
- YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(config, applicationName, cmdLine);
- yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
-
- return new YarnClusterClientV2(
- yarnClusterDescriptor,
- config);
- }
-
- private void logAndSysout(String message) {
- LOG.info(message);
- System.out.println(message);
- }
-
-}