You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:18 UTC
[04/10] flink git commit: [FLINK-3667] refactor client communication
classes
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
new file mode 100644
index 0000000..7220a29
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
+
+/**
+* All classes in this package contain code taken from
+* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+* and
+* https://github.com/hortonworks/simple-yarn-app
+* and
+* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+*
+* The Flink jar is uploaded to HDFS by this client.
+* The application master and all the TaskManager containers get the jar file downloaded
+* by YARN into their local fs.
+*
+*/
+public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
+ private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
+
+ private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+
+ /**
+ * Minimum memory requirements, checked by the Client.
+ */
+ private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
+ private static final int MIN_TM_MEMORY = 768;
+
+ private Configuration conf = new YarnConfiguration();
+
+ /**
+ * Files (usually in a distributed file system) used for the YARN session of Flink.
+ * Contains configuration files and jar files.
+ */
+ private Path sessionFilesDir;
+
+ /**
+ * If the user has specified a different number of slots, we store them here
+ */
+ private int slots = -1;
+
+ private int jobManagerMemoryMb = 1024;
+
+ private int taskManagerMemoryMb = 1024;
+
+ private int taskManagerCount = 1;
+
+ private String yarnQueue = null;
+
+ private String configurationDirectory;
+
+ private Path flinkConfigurationPath;
+
+ private Path flinkLoggingConfigurationPath; // optional
+
+ private Path flinkJarPath;
+
+ private String dynamicPropertiesEncoded;
+
+ private List<File> shipFiles = new ArrayList<>();
+ private org.apache.flink.configuration.Configuration flinkConfiguration;
+
+ private boolean detached;
+
+ private String customName = null;
+
+ public AbstractYarnClusterDescriptor() {
+ // for unit tests only
+ if(System.getenv("IN_TESTS") != null) {
+ try {
+ conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+ } catch (Throwable t) {
+ throw new RuntimeException("Error",t);
+ }
+ }
+
+ // load the config
+ this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
+ GlobalConfiguration.loadConfiguration(configurationDirectory);
+ this.flinkConfiguration = GlobalConfiguration.getConfiguration();
+
+ File confFile = new File(configurationDirectory + File.separator + CONFIG_FILE_NAME);
+ if (!confFile.exists()) {
+ throw new RuntimeException("Unable to locate configuration file in " + confFile);
+ }
+ flinkConfigurationPath = new Path(confFile.getAbsolutePath());
+
+ //check if there is a logback or log4j file
+ if (configurationDirectory.length() > 0) {
+ File logback = new File(configurationDirectory + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
+ if (logback.exists()) {
+ shipFiles.add(logback);
+ flinkLoggingConfigurationPath = new Path(logback.toURI());
+ }
+ File log4j = new File(configurationDirectory + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
+ if (log4j.exists()) {
+ shipFiles.add(log4j);
+ if (flinkLoggingConfigurationPath != null) {
+ // this means there is already a logback configuration file --> fail
+ LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
+ "Logback configuration files. Please delete or rename one of them.");
+ }
+ flinkLoggingConfigurationPath = new Path(log4j.toURI());
+ }
+ }
+ }
+
+ /**
+ * The class to bootstrap the application master of the Yarn cluster (runs main method).
+ */
+ protected abstract Class<?> getApplicationMasterClass();
+
+ public void setJobManagerMemory(int memoryMb) {
+ if(memoryMb < MIN_JM_MEMORY) {
+ throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
+ + "of " + MIN_JM_MEMORY+ " MB");
+ }
+ this.jobManagerMemoryMb = memoryMb;
+ }
+
+ public void setTaskManagerMemory(int memoryMb) {
+ if(memoryMb < MIN_TM_MEMORY) {
+ throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
+ + "of " + MIN_TM_MEMORY+ " MB");
+ }
+ this.taskManagerMemoryMb = memoryMb;
+ }
+
+ public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
+ this.flinkConfiguration = conf;
+ }
+
+ public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+ return flinkConfiguration;
+ }
+
+ public void setTaskManagerSlots(int slots) {
+ if(slots <= 0) {
+ throw new IllegalArgumentException("Number of TaskManager slots must be positive");
+ }
+ this.slots = slots;
+ }
+
+ public int getTaskManagerSlots() {
+ return this.slots;
+ }
+
+ public void setQueue(String queue) {
+ this.yarnQueue = queue;
+ }
+
+ public void setLocalJarPath(Path localJarPath) {
+ if(!localJarPath.toString().endsWith("jar")) {
+ throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
+ }
+ this.flinkJarPath = localJarPath;
+ }
+
+ public void setConfigurationFilePath(Path confPath) {
+ flinkConfigurationPath = confPath;
+ }
+
+ public void setConfigurationDirectory(String configurationDirectory) {
+ this.configurationDirectory = configurationDirectory;
+ }
+
+ public void setFlinkLoggingConfigurationPath(Path logConfPath) {
+ flinkLoggingConfigurationPath = logConfPath;
+ }
+
+ public Path getFlinkLoggingConfigurationPath() {
+ return flinkLoggingConfigurationPath;
+ }
+
+ public void setTaskManagerCount(int tmCount) {
+ if(tmCount < 1) {
+ throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
+ }
+ this.taskManagerCount = tmCount;
+ }
+
+ public int getTaskManagerCount() {
+ return this.taskManagerCount;
+ }
+
+ public void setShipFiles(List<File> shipFiles) {
+ for(File shipFile: shipFiles) {
+ // remove uberjar from ship list (by default everything in the lib/ folder is added to
+ // the list of files to ship, but we handle the uberjar separately.
+ if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
+ this.shipFiles.add(shipFile);
+ }
+ }
+ }
+
+ public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
+ this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
+ }
+
+ public String getDynamicPropertiesEncoded() {
+ return this.dynamicPropertiesEncoded;
+ }
+
+
+ private void isReadyForDeployment() throws YarnDeploymentException {
+ if(taskManagerCount <= 0) {
+ throw new YarnDeploymentException("Taskmanager count must be positive");
+ }
+ if(this.flinkJarPath == null) {
+ throw new YarnDeploymentException("The Flink jar path is null");
+ }
+ if(this.configurationDirectory == null) {
+ throw new YarnDeploymentException("Configuration directory not set");
+ }
+ if(this.flinkConfigurationPath == null) {
+ throw new YarnDeploymentException("Configuration path not set");
+ }
+ if(this.flinkConfiguration == null) {
+ throw new YarnDeploymentException("Flink configuration object has not been set");
+ }
+
+ // check if required Hadoop environment variables are set. If not, warn user
+ if(System.getenv("HADOOP_CONF_DIR") == null &&
+ System.getenv("YARN_CONF_DIR") == null) {
+ LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
+ "The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
+ "configuration for accessing YARN.");
+ }
+ }
+
+ private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
+ for(int i = 0; i < nodeManagers.length; i++) {
+ if(nodeManagers[i] >= toAllocate) {
+ nodeManagers[i] -= toAllocate;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void setDetachedMode(boolean detachedMode) {
+ this.detached = detachedMode;
+ }
+
+ public boolean isDetachedMode() {
+ return detached;
+ }
+
+
+ /**
+ * Gets a Hadoop Yarn client
+ * @return Returns a YarnClient which has to be shutdown manually
+ */
+ public static YarnClient getYarnClient(Configuration conf) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ return yarnClient;
+ }
+
+ @Override
+ public YarnClusterClient deploy() throws Exception {
+
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (!ugi.hasKerberosCredentials()) {
+ throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
+ "You may use kinit to authenticate and request a TGT from the Kerberos server.");
+ }
+ return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
+ @Override
+ public YarnClusterClient run() throws Exception {
+ return deployInternal();
+ }
+ });
+ } else {
+ return deployInternal();
+ }
+ }
+
+ /**
+ * This method will block until the ApplicationMaster/JobManager have been
+ * deployed on YARN.
+ */
+ protected YarnClusterClient deployInternal() throws Exception {
+ isReadyForDeployment();
+
+ LOG.info("Using values:");
+ LOG.info("\tTaskManager count = {}", taskManagerCount);
+ LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
+ LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
+
+ // Create application via yarnClient
+ final YarnClient yarnClient = getYarnClient(conf);
+ final YarnClientApplication yarnApplication = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+
+ // ------------------ Add dynamic properties to local flinkConfiguraton ------
+
+ Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
+ for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
+ flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
+ }
+
+ // ------------------ Set default file system scheme -------------------------
+
+ try {
+ org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
+ }
+ // ------------------ Check if the specified queue exists --------------------
+
+ try {
+ List<QueueInfo> queues = yarnClient.getAllQueues();
+ if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
+ boolean queueFound = false;
+ for (QueueInfo queue : queues) {
+ if (queue.getQueueName().equals(this.yarnQueue)) {
+ queueFound = true;
+ break;
+ }
+ }
+ if (!queueFound) {
+ String queueNames = "";
+ for (QueueInfo queue : queues) {
+ queueNames += queue.getQueueName() + ", ";
+ }
+ LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
+ "Available queues: " + queueNames);
+ }
+ } else {
+ LOG.debug("The YARN cluster does not have any queues configured");
+ }
+ } catch(Throwable e) {
+ LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Error details", e);
+ }
+ }
+
+ // ------------------ Check if the YARN ClusterClient has the requested resources --------------
+
+ // the yarnMinAllocationMB specifies the smallest possible container allocation size.
+ // all allocations below this value are automatically set to this value.
+ final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+ if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
+ LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
+ "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
+ "you requested will start.");
+ }
+
+ // set the memory to minAllocationMB to do the next checks correctly
+ if(jobManagerMemoryMb < yarnMinAllocationMB) {
+ jobManagerMemoryMb = yarnMinAllocationMB;
+ }
+ if(taskManagerMemoryMb < yarnMinAllocationMB) {
+ taskManagerMemoryMb = yarnMinAllocationMB;
+ }
+
+ Resource maxRes = appResponse.getMaximumResourceCapability();
+ final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
+ if(jobManagerMemoryMb > maxRes.getMemory() ) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+ + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
+ }
+
+ if(taskManagerMemoryMb > maxRes.getMemory() ) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+ + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
+ }
+
+ final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
+ "connecting from the beginning because the resources are currently not available in the cluster. " +
+ "The allocation might take more time than usual because the Flink YARN client needs to wait until " +
+ "the resources become available.";
+ int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
+ ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+ LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
+
+ }
+ if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
+ LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+ + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+ }
+ if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
+ LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+ }
+
+ // ----------------- check if the requested containers fit into the cluster.
+
+ int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
+ // first, allocate the jobManager somewhere.
+ if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+ LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
+ "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
+ Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
+ }
+ // allocate TaskManagers
+ for(int i = 0; i < taskManagerCount; i++) {
+ if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+ LOG.warn("There is not enough memory available in the YARN cluster. " +
+ "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
+ "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+ "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
+ "the following NodeManagers are available: " + Arrays.toString(nmFree) + NOTE_RSC );
+ }
+ }
+
+ // ------------------ Prepare Application Master Container ------------------------------
+
+ // respect custom JVM options in the YAML file
+ final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+ String logbackFile = configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME;
+ boolean hasLogback = new File(logbackFile).exists();
+ String log4jFile = configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME;
+
+ boolean hasLog4j = new File(log4jFile).exists();
+ if(hasLogback) {
+ shipFiles.add(new File(logbackFile));
+ }
+ if(hasLog4j) {
+ shipFiles.add(new File(log4jFile));
+ }
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ String amCommand = "$JAVA_HOME/bin/java"
+ + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration)
+ + "M " + javaOpts;
+
+ if(hasLogback || hasLog4j) {
+ amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
+
+ if(hasLogback) {
+ amCommand += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
+ }
+
+ if(hasLog4j) {
+ amCommand += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
+ }
+ }
+
+ amCommand += " " + getApplicationMasterClass().getName() + " "
+ + " 1>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"
+ + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
+ amContainer.setCommands(Collections.singletonList(amCommand));
+
+ LOG.debug("Application Master start command: " + amCommand);
+
+ // intialize HDFS
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ final FileSystem fs = FileSystem.get(conf);
+
+ // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+ if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+ fs.getScheme().startsWith("file")) {
+ LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ + "The Flink YARN client needs to store its files in a distributed file system");
+ }
+
+ // Set-up ApplicationSubmissionContext for the application
+ ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
+
+ if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
+ // activate re-execution of failed applications
+ appContext.setMaxAppAttempts(
+ flinkConfiguration.getInteger(
+ ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+
+ activateHighAvailabilitySupport(appContext);
+ } else {
+ // set number of application retries to 1 in the default case
+ appContext.setMaxAppAttempts(
+ flinkConfiguration.getInteger(
+ ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+ 1));
+ }
+
+ final ApplicationId appId = appContext.getApplicationId();
+
+ // Setup jar for ApplicationMaster
+ LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+ Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
+ Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+ Map<String, LocalResource> localResources = new HashMap<>(2);
+ localResources.put("flink.jar", appMasterJar);
+ localResources.put("flink-conf.yaml", flinkConf);
+
+
+ // setup security tokens (code from apache storm)
+ final Path[] paths = new Path[2 + shipFiles.size()];
+ StringBuilder envShipFileList = new StringBuilder();
+ // upload ship files
+ for (int i = 0; i < shipFiles.size(); i++) {
+ File shipFile = shipFiles.get(i);
+ LocalResource shipResources = Records.newRecord(LocalResource.class);
+ Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+ paths[2 + i] = Utils.setupLocalResource(fs, appId.toString(),
+ shipLocalPath, shipResources, fs.getHomeDirectory());
+ localResources.put(shipFile.getName(), shipResources);
+
+ envShipFileList.append(paths[2 + i]);
+ if(i+1 < shipFiles.size()) {
+ envShipFileList.append(',');
+ }
+ }
+
+ paths[0] = remotePathJar;
+ paths[1] = remotePathConf;
+ sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+ fs.setPermission(sessionFilesDir, permission); // set permission for path.
+
+ Utils.setTokensFor(amContainer, paths, conf);
+
+ amContainer.setLocalResources(localResources);
+ fs.close();
+
+ // Setup CLASSPATH for ApplicationMaster
+ Map<String, String> appMasterEnv = new HashMap<>();
+ // set user specified app master environment variables
+ appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration));
+ // set classpath from YARN configuration
+ Utils.setupEnv(conf, appMasterEnv);
+ // set Flink on YARN internal configuration values
+ appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
+ appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
+ appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() );
+ appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
+ appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
+ appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
+
+ if(dynamicPropertiesEncoded != null) {
+ appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+ }
+
+ amContainer.setEnvironment(appMasterEnv);
+
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(jobManagerMemoryMb);
+ capability.setVirtualCores(1);
+
+ String name;
+ if(customName == null) {
+ name = "Flink session with " + taskManagerCount + " TaskManagers";
+ if(detached) {
+ name += " (detached)";
+ }
+ } else {
+ name = customName;
+ }
+
+ appContext.setApplicationName(name); // application name
+ appContext.setApplicationType("Apache Flink");
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ if(yarnQueue != null) {
+ appContext.setQueue(yarnQueue);
+ }
+
+ // add a hook to clean up in case deployment fails
+ Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
+ Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
+ LOG.info("Submitting application master " + appId);
+ yarnClient.submitApplication(appContext);
+
+ LOG.info("Waiting for the cluster to be allocated");
+ int waittime = 0;
+ ApplicationReport report;
+ loop: while( true ) {
+ try {
+ report = yarnClient.getApplicationReport(appId);
+ } catch (IOException e) {
+ throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
+ }
+ YarnApplicationState appState = report.getYarnApplicationState();
+ switch(appState) {
+ case FAILED:
+ case FINISHED:
+ case KILLED:
+ throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+ + appState + " during deployment. \n" +
+ "Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
+ "yarn logs -applicationId " + appId);
+ //break ..
+ case RUNNING:
+ LOG.info("YARN application has been deployed successfully.");
+ break loop;
+ default:
+ LOG.info("Deploying cluster, current state " + appState);
+ if(waittime > 60000) {
+ LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
+ }
+
+ }
+ waittime += 1000;
+ Thread.sleep(1000);
+ }
+ // print the application id for user to cancel themselves.
+ if (isDetachedMode()) {
+ LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
+ "Flink on YARN, use the following command or a YARN web interface to stop " +
+ "it:\nyarn application -kill " + appId + "\nPlease also note that the " +
+ "temporary files of the YARN session in the home directoy will not be removed.");
+ }
+ // since deployment was successful, remove the hook
+ try {
+ Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
+ } catch (IllegalStateException e) {
+ // we're already in the shut down hook.
+ }
+
+ String host = report.getHost();
+ int port = report.getRpcPort();
+ String trackingURL = report.getTrackingUrl();
+
+ // Correctly initialize the Flink config
+ flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
+ flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+ // the Flink cluster is deployed in YARN. Represent cluster
+ return new YarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir);
+ }
+
+ /**
+ * Kills YARN application and stops YARN client.
+ *
+ * Use this method to kill the App before it has been properly deployed
+ */
+ private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
+ LOG.info("Killing YARN application");
+
+ try {
+ yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
+ } catch (Exception e) {
+ // we only log a debug message here because the "killApplication" call is a best-effort
+ // call (we don't know if the application has been deployed when the error occured).
+ LOG.debug("Error while killing YARN application", e);
+ }
+ yarnClient.stop();
+ }
+
+
+ private static class ClusterResourceDescription {
+ final public int totalFreeMemory;
+ final public int containerLimit;
+ final public int[] nodeManagersFree;
+
+ public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
+ this.totalFreeMemory = totalFreeMemory;
+ this.containerLimit = containerLimit;
+ this.nodeManagersFree = nodeManagersFree;
+ }
+ }
+
+ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+
+ int totalFreeMemory = 0;
+ int containerLimit = 0;
+ int[] nodeManagersFree = new int[nodes.size()];
+
+ for(int i = 0; i < nodes.size(); i++) {
+ NodeReport rep = nodes.get(i);
+ int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+ nodeManagersFree[i] = free;
+ totalFreeMemory += free;
+ if(free > containerLimit) {
+ containerLimit = free;
+ }
+ }
+ return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
+ }
+
+ @Override
+ public String getClusterDescription() throws Exception {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+
+ YarnClient yarnClient = getYarnClient(conf);
+ YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+
+ ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ final String format = "|%-16s |%-16s %n";
+ ps.printf("|Property |Value %n");
+ ps.println("+---------------------------------------+");
+ int totalMemory = 0;
+ int totalCores = 0;
+ for(NodeReport rep : nodes) {
+ final Resource res = rep.getCapability();
+ totalMemory += res.getMemory();
+ totalCores += res.getVirtualCores();
+ ps.format(format, "NodeID", rep.getNodeId());
+ ps.format(format, "Memory", res.getMemory() + " MB");
+ ps.format(format, "vCores", res.getVirtualCores());
+ ps.format(format, "HealthReport", rep.getHealthReport());
+ ps.format(format, "Containers", rep.getNumContainers());
+ ps.println("+---------------------------------------+");
+ }
+ ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
+ List<QueueInfo> qInfo = yarnClient.getAllQueues();
+ for(QueueInfo q : qInfo) {
+ ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
+ q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
+ }
+ yarnClient.stop();
+ return baos.toString();
+ }
+
+ public String getSessionFilesDir() {
+ return sessionFilesDir.toString();
+ }
+
+ public void setName(String name) {
+ if(name == null) {
+ throw new IllegalArgumentException("The passed name is null");
+ }
+ customName = name;
+ }
+
+ private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
+ ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
+
+ reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
+ reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
+ }
+
+ /**
+ * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
+ * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval
+ * methods. Depending on the Hadoop version these methods are supported or not. If the methods
+ * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or
+ * setAttemptFailuresValidityInterval are called.
+ */
+ private static class ApplicationSubmissionContextReflector {
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
+
+ private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+
+ public static ApplicationSubmissionContextReflector getInstance() {
+ return instance;
+ }
+
+ private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
+ private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
+
+ private final Method keepContainersMethod;
+ private final Method attemptFailuresValidityIntervalMethod;
+
+ private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
+ Method keepContainersMethod;
+ Method attemptFailuresValidityIntervalMethod;
+
+ try {
+ // this method is only supported by Hadoop 2.4.0 onwards
+ keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class);
+ LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+ } catch (NoSuchMethodException e) {
+ LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+ // assign null because the Hadoop version apparently does not support this call.
+ keepContainersMethod = null;
+ }
+
+ this.keepContainersMethod = keepContainersMethod;
+
+ try {
+ // this method is only supported by Hadoop 2.6.0 onwards
+ attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
+ LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+ } catch (NoSuchMethodException e) {
+ LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+ // assign null because the Hadoop version apparently does not support this call.
+ attemptFailuresValidityIntervalMethod = null;
+ }
+
+ this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
+ }
+
+ public void setKeepContainersAcrossApplicationAttempts(
+ ApplicationSubmissionContext appContext,
+ boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+
+ if (keepContainersMethod != null) {
+ LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ keepContainersMethod.invoke(appContext, keepContainers);
+ } else {
+ LOG.debug("{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(), keepContainersMethodName);
+ }
+ }
+
+ public void setAttemptFailuresValidityInterval(
+ ApplicationSubmissionContext appContext,
+ long validityInterval) throws InvocationTargetException, IllegalAccessException {
+ if (attemptFailuresValidityIntervalMethod != null) {
+ LOG.debug("Calling method {} of {}.",
+ attemptFailuresValidityIntervalMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
+ } else {
+ LOG.debug("{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(),
+ attemptsFailuresValidityIntervalMethodName);
+ }
+ }
+ }
+
+ private static class YarnDeploymentException extends RuntimeException {
+ private static final long serialVersionUID = -812040641215388943L;
+
+ public YarnDeploymentException() {
+ }
+
+ public YarnDeploymentException(String message) {
+ super(message);
+ }
+
+ public YarnDeploymentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private class DeploymentFailureHook extends Thread {
+
+ DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication) {
+ this.yarnClient = yarnClient;
+ this.yarnApplication = yarnApplication;
+ }
+
+ private YarnClient yarnClient;
+ private YarnClientApplication yarnApplication;
+
+ @Override
+ public void run() {
+ LOG.info("Cancelling deployment from Deployment Failure Hook");
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ LOG.info("Deleting files in " + sessionFilesDir);
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(sessionFilesDir, true);
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
deleted file mode 100644
index 467e06d..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-/**
- * Default implementation of {@link FlinkYarnClientBase} which starts an {@link YarnApplicationMasterRunner}.
- */
-public class FlinkYarnClient extends FlinkYarnClientBase {
- @Override
- protected Class<?> getApplicationMasterClass() {
- return YarnApplicationMasterRunner.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
deleted file mode 100644
index 6f81d09..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ /dev/null
@@ -1,907 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
-* All classes in this package contain code taken from
-* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
-* and
-* https://github.com/hortonworks/simple-yarn-app
-* and
-* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
-*
-* The Flink jar is uploaded to HDFS by this client.
-* The application master and all the TaskManager containers get the jar file downloaded
-* by YARN into their local fs.
-*
-*/
-public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
-
- /**
- * Minimum memory requirements, checked by the Client.
- */
- private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
- private static final int MIN_TM_MEMORY = 768;
-
- private Configuration conf;
- private YarnClient yarnClient;
- private YarnClientApplication yarnApplication;
- private Thread deploymentFailureHook = new DeploymentFailureHook();
-
- /**
- * Files (usually in a distributed file system) used for the YARN session of Flink.
- * Contains configuration files and jar files.
- */
- private Path sessionFilesDir;
-
- /**
- * If the user has specified a different number of slots, we store them here
- */
- private int slots = -1;
-
- private int jobManagerMemoryMb = 1024;
-
- private int taskManagerMemoryMb = 1024;
-
- private int taskManagerCount = 1;
-
- private String yarnQueue = null;
-
- private String configurationDirectory;
-
- private Path flinkConfigurationPath;
-
- private Path flinkLoggingConfigurationPath; // optional
-
- private Path flinkJarPath;
-
- private String dynamicPropertiesEncoded;
-
- private List<File> shipFiles = new ArrayList<>();
- private org.apache.flink.configuration.Configuration flinkConfiguration;
-
- private boolean detached;
-
- private String customName = null;
-
- public FlinkYarnClientBase() {
- conf = new YarnConfiguration();
- if(this.yarnClient == null) {
- // Create yarnClient
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
- }
-
- // for unit tests only
- if(System.getenv("IN_TESTS") != null) {
- try {
- conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
- } catch (Throwable t) {
- throw new RuntimeException("Error",t);
- }
- }
- }
-
- /**
- * The class to bootstrap the application master of the Yarn cluster (runs main method).
- */
- protected abstract Class<?> getApplicationMasterClass();
-
- @Override
- public void setJobManagerMemory(int memoryMb) {
- if(memoryMb < MIN_JM_MEMORY) {
- throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_JM_MEMORY+ " MB");
- }
- this.jobManagerMemoryMb = memoryMb;
- }
-
- @Override
- public void setTaskManagerMemory(int memoryMb) {
- if(memoryMb < MIN_TM_MEMORY) {
- throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_TM_MEMORY+ " MB");
- }
- this.taskManagerMemoryMb = memoryMb;
- }
-
- @Override
- public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
- this.flinkConfiguration = conf;
- }
-
- @Override
- public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
- return flinkConfiguration;
- }
-
- @Override
- public void setTaskManagerSlots(int slots) {
- if(slots <= 0) {
- throw new IllegalArgumentException("Number of TaskManager slots must be positive");
- }
- this.slots = slots;
- }
-
- @Override
- public int getTaskManagerSlots() {
- return this.slots;
- }
-
- @Override
- public void setQueue(String queue) {
- this.yarnQueue = queue;
- }
-
- @Override
- public void setLocalJarPath(Path localJarPath) {
- if(!localJarPath.toString().endsWith("jar")) {
- throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
- }
- this.flinkJarPath = localJarPath;
- }
-
- @Override
- public void setConfigurationFilePath(Path confPath) {
- flinkConfigurationPath = confPath;
- }
-
- @Override
- public void setConfigurationDirectory(String configurationDirectory) {
- this.configurationDirectory = configurationDirectory;
- }
-
- @Override
- public void setFlinkLoggingConfigurationPath(Path logConfPath) {
- flinkLoggingConfigurationPath = logConfPath;
- }
-
- @Override
- public Path getFlinkLoggingConfigurationPath() {
- return flinkLoggingConfigurationPath;
- }
-
- @Override
- public void setTaskManagerCount(int tmCount) {
- if(tmCount < 1) {
- throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
- }
- this.taskManagerCount = tmCount;
- }
-
- @Override
- public int getTaskManagerCount() {
- return this.taskManagerCount;
- }
-
- @Override
- public void setShipFiles(List<File> shipFiles) {
- for(File shipFile: shipFiles) {
- // remove uberjar from ship list (by default everything in the lib/ folder is added to
- // the list of files to ship, but we handle the uberjar separately.
- if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
- this.shipFiles.add(shipFile);
- }
- }
- }
-
- @Override
- public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
- this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
- }
-
- @Override
- public String getDynamicPropertiesEncoded() {
- return this.dynamicPropertiesEncoded;
- }
-
-
- public void isReadyForDeployment() throws YarnDeploymentException {
- if(taskManagerCount <= 0) {
- throw new YarnDeploymentException("Taskmanager count must be positive");
- }
- if(this.flinkJarPath == null) {
- throw new YarnDeploymentException("The Flink jar path is null");
- }
- if(this.configurationDirectory == null) {
- throw new YarnDeploymentException("Configuration directory not set");
- }
- if(this.flinkConfigurationPath == null) {
- throw new YarnDeploymentException("Configuration path not set");
- }
- if(this.flinkConfiguration == null) {
- throw new YarnDeploymentException("Flink configuration object has not been set");
- }
-
- // check if required Hadoop environment variables are set. If not, warn user
- if(System.getenv("HADOOP_CONF_DIR") == null &&
- System.getenv("YARN_CONF_DIR") == null) {
- LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
- "The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
- "configuration for accessing YARN.");
- }
- }
-
- public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
- for(int i = 0; i < nodeManagers.length; i++) {
- if(nodeManagers[i] >= toAllocate) {
- nodeManagers[i] -= toAllocate;
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void setDetachedMode(boolean detachedMode) {
- this.detached = detachedMode;
- }
-
- @Override
- public boolean isDetached() {
- return detached;
- }
-
- @Override
- public AbstractFlinkYarnCluster deploy() throws Exception {
-
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- if (!ugi.hasKerberosCredentials()) {
- throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
- "You may use kinit to authenticate and request a TGT from the Kerberos server.");
- }
- return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
- @Override
- public AbstractFlinkYarnCluster run() throws Exception {
- return deployInternal();
- }
- });
- } else {
- return deployInternal();
- }
- }
-
-
-
- /**
- * This method will block until the ApplicationMaster/JobManager have been
- * deployed on YARN.
- */
- protected AbstractFlinkYarnCluster deployInternal() throws Exception {
- isReadyForDeployment();
-
- LOG.info("Using values:");
- LOG.info("\tTaskManager count = {}", taskManagerCount);
- LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
- LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
-
- // Create application via yarnClient
- yarnApplication = yarnClient.createApplication();
- GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
- // ------------------ Add dynamic properties to local flinkConfiguraton ------
-
- Map<String, String> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
- for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
- flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
- }
-
- try {
- org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
- } catch (IOException e) {
- throw new IOException("Error while setting the default " +
- "filesystem scheme from configuration.", e);
- }
- // ------------------ Check if the specified queue exists --------------
-
- try {
- List<QueueInfo> queues = yarnClient.getAllQueues();
- if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
- boolean queueFound = false;
- for (QueueInfo queue : queues) {
- if (queue.getQueueName().equals(this.yarnQueue)) {
- queueFound = true;
- break;
- }
- }
- if (!queueFound) {
- String queueNames = "";
- for (QueueInfo queue : queues) {
- queueNames += queue.getQueueName() + ", ";
- }
- LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
- "Available queues: " + queueNames);
- }
- } else {
- LOG.debug("The YARN cluster does not have any queues configured");
- }
- } catch(Throwable e) {
- LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Error details", e);
- }
- }
-
- // ------------------ Check if the YARN Cluster has the requested resources --------------
-
- // the yarnMinAllocationMB specifies the smallest possible container allocation size.
- // all allocations below this value are automatically set to this value.
- final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
- if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
- LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
- + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
- "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
- "you requested will start.");
- }
-
- // set the memory to minAllocationMB to do the next checks correctly
- if(jobManagerMemoryMb < yarnMinAllocationMB) {
- jobManagerMemoryMb = yarnMinAllocationMB;
- }
- if(taskManagerMemoryMb < yarnMinAllocationMB) {
- taskManagerMemoryMb = yarnMinAllocationMB;
- }
-
- Resource maxRes = appResponse.getMaximumResourceCapability();
- final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if(jobManagerMemoryMb > maxRes.getMemory() ) {
- failSessionDuringDeployment();
- throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
- }
-
- if(taskManagerMemoryMb > maxRes.getMemory() ) {
- failSessionDuringDeployment();
- throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
- }
-
- final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
- "connecting from the beginning because the resources are currently not available in the cluster. " +
- "The allocation might take more time than usual because the Flink YARN client needs to wait until " +
- "the resources become available.";
- int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
- ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
- if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
- LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
- + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
-
- }
- if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
- LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
- }
- if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
- LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
- }
-
- // ----------------- check if the requested containers fit into the cluster.
-
- int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
- // first, allocate the jobManager somewhere.
- if(!allocateResource(nmFree, jobManagerMemoryMb)) {
- LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
- "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
- Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
- }
- // allocate TaskManagers
- for(int i = 0; i < taskManagerCount; i++) {
- if(!allocateResource(nmFree, taskManagerMemoryMb)) {
- LOG.warn("There is not enough memory available in the YARN cluster. " +
- "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
- "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
- "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
- "the following NodeManagers are available: " + Arrays.toString(nmFree) + NOTE_RSC );
- }
- }
-
- // ------------------ Prepare Application Master Container ------------------------------
-
- // respect custom JVM options in the YAML file
- final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
- String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
- boolean hasLogback = new File(logbackFile).exists();
- String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
-
- boolean hasLog4j = new File(log4jFile).exists();
- if(hasLogback) {
- shipFiles.add(new File(logbackFile));
- }
- if(hasLog4j) {
- shipFiles.add(new File(log4jFile));
- }
-
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
-
- String amCommand = "$JAVA_HOME/bin/java"
- + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration)
- + "M " + javaOpts;
-
- if(hasLogback || hasLog4j) {
- amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
-
- if(hasLogback) {
- amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
- }
-
- if(hasLog4j) {
- amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
- }
- }
-
- amCommand += " " + getApplicationMasterClass().getName() + " "
- + " 1>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"
- + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
- amContainer.setCommands(Collections.singletonList(amCommand));
-
- LOG.debug("Application Master start command: " + amCommand);
-
- // intialize HDFS
- // Copy the application master jar to the filesystem
- // Create a local resource to point to the destination jar path
- final FileSystem fs = FileSystem.get(conf);
-
- // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
- if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
- fs.getScheme().startsWith("file")) {
- LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
- + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
- + "The Flink YARN client needs to store its files in a distributed file system");
- }
-
- // Set-up ApplicationSubmissionContext for the application
- ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
-
- if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
- // activate re-execution of failed applications
- appContext.setMaxAppAttempts(
- flinkConfiguration.getInteger(
- ConfigConstants.YARN_APPLICATION_ATTEMPTS,
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
-
- activateHighAvailabilitySupport(appContext);
- } else {
- // set number of application retries to 1 in the default case
- appContext.setMaxAppAttempts(
- flinkConfiguration.getInteger(
- ConfigConstants.YARN_APPLICATION_ATTEMPTS,
- 1));
- }
-
- final ApplicationId appId = appContext.getApplicationId();
-
- // Setup jar for ApplicationMaster
- LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
- Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
- Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
- Map<String, LocalResource> localResources = new HashMap<>(2);
- localResources.put("flink.jar", appMasterJar);
- localResources.put("flink-conf.yaml", flinkConf);
-
-
- // setup security tokens (code from apache storm)
- final Path[] paths = new Path[2 + shipFiles.size()];
- StringBuilder envShipFileList = new StringBuilder();
- // upload ship files
- for (int i = 0; i < shipFiles.size(); i++) {
- File shipFile = shipFiles.get(i);
- LocalResource shipResources = Records.newRecord(LocalResource.class);
- Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
- paths[2 + i] = Utils.setupLocalResource(fs, appId.toString(),
- shipLocalPath, shipResources, fs.getHomeDirectory());
- localResources.put(shipFile.getName(), shipResources);
-
- envShipFileList.append(paths[2 + i]);
- if(i+1 < shipFiles.size()) {
- envShipFileList.append(',');
- }
- }
-
- paths[0] = remotePathJar;
- paths[1] = remotePathConf;
- sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
-
- FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
- fs.setPermission(sessionFilesDir, permission); // set permission for path.
-
- Utils.setTokensFor(amContainer, paths, conf);
-
- amContainer.setLocalResources(localResources);
- fs.close();
-
- // Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<>();
- // set user specified app master environment variables
- appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration));
- // set classpath from YARN configuration
- Utils.setupEnv(conf, appMasterEnv);
- // set Flink on YARN internal configuration values
- appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
- appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
- appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() );
- appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
- appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
- appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
- appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
- appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
- appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
-
- if(dynamicPropertiesEncoded != null) {
- appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
- }
-
- amContainer.setEnvironment(appMasterEnv);
-
- // Set up resource type requirements for ApplicationMaster
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(jobManagerMemoryMb);
- capability.setVirtualCores(1);
-
- String name;
- if(customName == null) {
- name = "Flink session with " + taskManagerCount + " TaskManagers";
- if(detached) {
- name += " (detached)";
- }
- } else {
- name = customName;
- }
-
- appContext.setApplicationName(name); // application name
- appContext.setApplicationType("Apache Flink");
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(capability);
- if(yarnQueue != null) {
- appContext.setQueue(yarnQueue);
- }
-
- // add a hook to clean up in case deployment fails
- Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
- LOG.info("Submitting application master " + appId);
- yarnClient.submitApplication(appContext);
-
- LOG.info("Waiting for the cluster to be allocated");
- int waittime = 0;
- loop: while( true ) {
- ApplicationReport report;
- try {
- report = yarnClient.getApplicationReport(appId);
- } catch (IOException e) {
- throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
- }
- YarnApplicationState appState = report.getYarnApplicationState();
- switch(appState) {
- case FAILED:
- case FINISHED:
- case KILLED:
- throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
- + appState + " during deployment. \n" +
- "Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
- "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
- "yarn logs -applicationId " + appId);
- //break ..
- case RUNNING:
- LOG.info("YARN application has been deployed successfully.");
- break loop;
- default:
- LOG.info("Deploying cluster, current state " + appState);
- if(waittime > 60000) {
- LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
- }
-
- }
- waittime += 1000;
- Thread.sleep(1000);
- }
- // print the application id for user to cancel themselves.
- if (isDetached()) {
- LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
- "Flink on YARN, use the following command or a YARN web interface to stop " +
- "it:\nyarn application -kill " + appId + "\nPlease also note that the " +
- "temporary files of the YARN session in the home directoy will not be removed.");
- }
- // since deployment was successful, remove the hook
- try {
- Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
- } catch (IllegalStateException e) {
- // we're already in the shut down hook.
- }
- // the Flink cluster is deployed in YARN. Represent cluster
- return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached);
- }
-
- /**
- * Kills YARN application and stops YARN client.
- *
- * Use this method to kill the App before it has been properly deployed
- */
- private void failSessionDuringDeployment() {
- LOG.info("Killing YARN application");
-
- try {
- yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
- } catch (Exception e) {
- // we only log a debug message here because the "killApplication" call is a best-effort
- // call (we don't know if the application has been deployed when the error occured).
- LOG.debug("Error while killing YARN application", e);
- }
- yarnClient.stop();
- }
-
-
- private static class ClusterResourceDescription {
- final public int totalFreeMemory;
- final public int containerLimit;
- final public int[] nodeManagersFree;
-
- public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
- this.totalFreeMemory = totalFreeMemory;
- this.containerLimit = containerLimit;
- this.nodeManagersFree = nodeManagersFree;
- }
- }
-
- private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-
- int totalFreeMemory = 0;
- int containerLimit = 0;
- int[] nodeManagersFree = new int[nodes.size()];
-
- for(int i = 0; i < nodes.size(); i++) {
- NodeReport rep = nodes.get(i);
- int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
- nodeManagersFree[i] = free;
- totalFreeMemory += free;
- if(free > containerLimit) {
- containerLimit = free;
- }
- }
- return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
- }
-
- @Override
- public String getClusterDescription() throws Exception {
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
-
- YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-
- ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
- final String format = "|%-16s |%-16s %n";
- ps.printf("|Property |Value %n");
- ps.println("+---------------------------------------+");
- int totalMemory = 0;
- int totalCores = 0;
- for(NodeReport rep : nodes) {
- final Resource res = rep.getCapability();
- totalMemory += res.getMemory();
- totalCores += res.getVirtualCores();
- ps.format(format, "NodeID", rep.getNodeId());
- ps.format(format, "Memory", res.getMemory() + " MB");
- ps.format(format, "vCores", res.getVirtualCores());
- ps.format(format, "HealthReport", rep.getHealthReport());
- ps.format(format, "Containers", rep.getNumContainers());
- ps.println("+---------------------------------------+");
- }
- ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
- List<QueueInfo> qInfo = yarnClient.getAllQueues();
- for(QueueInfo q : qInfo) {
- ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
- q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
- }
- yarnClient.stop();
- return baos.toString();
- }
-
- @Override
- public String getSessionFilesDir() {
- return sessionFilesDir.toString();
- }
-
- @Override
- public void setName(String name) {
- if(name == null) {
- throw new IllegalArgumentException("The passed name is null");
- }
- customName = name;
- }
-
- private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
- ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
-
- reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
- reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
- }
-
- /**
- * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
- * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval
- * methods. Depending on the Hadoop version these methods are supported or not. If the methods
- * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or
- * setAttemptFailuresValidityInterval are called.
- */
- private static class ApplicationSubmissionContextReflector {
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
-
- private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
-
- public static ApplicationSubmissionContextReflector getInstance() {
- return instance;
- }
-
- private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
- private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
-
- private final Method keepContainersMethod;
- private final Method attemptFailuresValidityIntervalMethod;
-
- private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
- Method keepContainersMethod;
- Method attemptFailuresValidityIntervalMethod;
-
- try {
- // this method is only supported by Hadoop 2.4.0 onwards
- keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class);
- LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName);
- } catch (NoSuchMethodException e) {
- LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName);
- // assign null because the Hadoop version apparently does not support this call.
- keepContainersMethod = null;
- }
-
- this.keepContainersMethod = keepContainersMethod;
-
- try {
- // this method is only supported by Hadoop 2.6.0 onwards
- attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
- LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
- } catch (NoSuchMethodException e) {
- LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
- // assign null because the Hadoop version apparently does not support this call.
- attemptFailuresValidityIntervalMethod = null;
- }
-
- this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
- }
-
- public void setKeepContainersAcrossApplicationAttempts(
- ApplicationSubmissionContext appContext,
- boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
-
- if (keepContainersMethod != null) {
- LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
- appContext.getClass().getCanonicalName());
- keepContainersMethod.invoke(appContext, keepContainers);
- } else {
- LOG.debug("{} does not support method {}. Doing nothing.",
- appContext.getClass().getCanonicalName(), keepContainersMethodName);
- }
- }
-
- public void setAttemptFailuresValidityInterval(
- ApplicationSubmissionContext appContext,
- long validityInterval) throws InvocationTargetException, IllegalAccessException {
- if (attemptFailuresValidityIntervalMethod != null) {
- LOG.debug("Calling method {} of {}.",
- attemptFailuresValidityIntervalMethod.getName(),
- appContext.getClass().getCanonicalName());
- attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
- } else {
- LOG.debug("{} does not support method {}. Doing nothing.",
- appContext.getClass().getCanonicalName(),
- attemptsFailuresValidityIntervalMethodName);
- }
- }
- }
-
- public static class YarnDeploymentException extends RuntimeException {
- private static final long serialVersionUID = -812040641215388943L;
-
- public YarnDeploymentException() {
- }
-
- public YarnDeploymentException(String message) {
- super(message);
- }
-
- public YarnDeploymentException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- private class DeploymentFailureHook extends Thread {
- @Override
- public void run() {
- LOG.info("Cancelling deployment from Deployment Failure Hook");
- failSessionDuringDeployment();
- LOG.info("Deleting files in " + sessionFilesDir);
- try {
- FileSystem fs = FileSystem.get(conf);
- fs.delete(sessionFilesDir, true);
- fs.close();
- } catch (IOException e) {
- LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
- }
- }
- }
-}
-