You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:05 UTC
[46/52] [abbrv] flink git commit: [FLINK-4928] [yarn] Implement
FLIP-6 YARN Application Master Runner
[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0113e5a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0113e5a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0113e5a4
Branch: refs/heads/master
Commit: 0113e5a467858f9cd435e80df2c2626170e5de62
Parents: e28b116
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100
----------------------------------------------------------------------
...bstractYarnFlinkApplicationMasterRunner.java | 213 +++++++++++++
.../yarn/YarnFlinkApplicationMasterRunner.java | 316 +++++++++++++++++++
2 files changed, 529 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0113e5a4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..923694e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public abstract class AbstractYarnFlinkApplicationMasterRunner {
+
+ /** Logger */
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
+
+ /** The process environment variables */
+ protected static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the application master failed */
+ protected static final int INIT_ERROR_EXIT_CODE = 31;
+
+ /** The host name passed by env */
+ protected String appMasterHostname;
+
+ /**
+ * The instance entry point for the YARN application master. Obtains user group
+ * information and calls the main work method {@link #runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ Preconditions.checkArgument(yarnClientUsername != null, "YARN client user name environment variable {} not set",
+ YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String currDir = ENV.get(Environment.PWD.key());
+ Preconditions.checkArgument(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+ LOG.debug("Current working directory: {}", currDir);
+
+ final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.debug("Remote keytab path obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.debug("Keytab path: {}", keytabPath);
+ }
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+ currentUser.getShortUserName(), yarnClientUsername );
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
+
+ // Flink configuration
+ final Map<String, String> dynamicProperties =
+ FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+
+ final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+ if(keytabPath != null && remoteKeytabPrincipal != null) {
+ flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+ flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ }
+
+ SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+ // Note that we use the "appMasterHostname" given by YARN here, to make sure
+ // we use the hostnames given by YARN consistently throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are different actors.
+ this.appMasterHostname = ENV.get(Environment.NM_HOST.key());
+ Preconditions.checkArgument(appMasterHostname != null,
+ "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+ LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+
+ return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ return runApplicationMaster(flinkConfig);
+ }
+ });
+
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization failed", t);
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Core work method
+ // ------------------------------------------------------------------------
+
+ /**
+ * The main work method, must run as a privileged action.
+ *
+ * @return The return code for the Java process.
+ */
+ protected abstract int runApplicationMaster(Configuration config);
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+ /**
+ * @param baseDirectory The working directory
+ * @param additional Additional parameters
+ *
+ * @return The configuration to be used by the TaskExecutors.
+ */
+ private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
+ LOG.info("Loading config from directory {}.", baseDirectory);
+
+ Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
+
+ configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+ // add dynamic properties to JobManager configuration.
+ for (Map.Entry<String, String> property : additional.entrySet()) {
+ configuration.setString(property.getKey(), property.getValue());
+ }
+
+ // override zookeeper namespace with user cli argument (if provided)
+ String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+ if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
+ configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
+ }
+
+ // if a web monitor shall be started, set the port to random binding
+ if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ }
+
+ // if the user has set the deprecated YARN-specific config keys, we add the
+ // corresponding generic config keys instead. that way, later code needs not
+ // deal with deprecated config keys
+
+ BootstrapTools.substituteDeprecatedConfigKey(configuration,
+ ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+
+ BootstrapTools.substituteDeprecatedConfigKey(configuration,
+ ConfigConstants.YARN_HEAP_CUTOFF_MIN,
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
+ BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+ ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
+ ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+
+ BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+ ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+ ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+
+ return configuration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0113e5a4/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..e58c77e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasnagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * JobMaster handles Flink job execution, while the YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
+ implements OnCompletionActions, FatalErrorHandler {
+
+ /** Logger */
+ protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+ /** The job graph file path */
+ private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private RpcService commonRpcService;
+
+ @GuardedBy("lock")
+ private ResourceManager resourceManager;
+
+ @GuardedBy("lock")
+ private JobManagerRunner jobManagerRunner;
+
+ @GuardedBy("lock")
+ private JobGraph jobGraph;
+
+ // ------------------------------------------------------------------------
+ // Program entry point
+ // ------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN application master.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new YarnFlinkApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ @Override
+ protected int runApplicationMaster(Configuration config) {
+
+ try {
+ // ---- (1) create common services
+
+ // try to start the rpc service
+ // using the port range definition from the config.
+ final String amPortRange = config.getString(
+ ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+ ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+ synchronized (lock) {
+ haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+ metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
+
+ // ---- (2) init resource manager -------
+ resourceManager = createResourceManager(config);
+
+ // ---- (3) init job master parameters
+ jobManagerRunner = createJobManagerRunner(config);
+
+ // ---- (4) start the resource manager and job manager runner:
+ resourceManager.start();
+ LOG.debug("YARN Flink Resource Manager started");
+
+ jobManagerRunner.start();
+ LOG.debug("Job Manager Runner started");
+
+ // ---- (5) start the web monitor
+ // TODO: add web monitor
+ }
+
+ // wait for resource manager to finish
+ resourceManager.getTerminationFuture().get();
+ // everything started, we can wait until all is done or the process is killed
+ LOG.info("YARN Application Master finished");
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization failed", t);
+ shutdown(ApplicationStatus.FAILED, t.getMessage());
+ return INIT_ERROR_EXIT_CODE;
+ }
+
+ return 0;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ protected RpcService createRpcService(
+ Configuration configuration,
+ String bindAddress,
+ String portRange) throws Exception{
+ ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+ FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+ return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
+ }
+
+ private ResourceManager createResourceManager(Configuration config) throws ConfigurationException {
+ final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
+ final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+ final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
+
+ return new YarnResourceManager(config,
+ ENV,
+ commonRpcService,
+ resourceManagerConfiguration,
+ haServices,
+ slotManagerFactory,
+ metricRegistry,
+ jobLeaderIdService,
+ this);
+ }
+
+ private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{
+ // first get JobGraph from local resources
+ //TODO: generate the job graph from user's jar
+ jobGraph = loadJobGraph(config);
+
+ // we first need to mark the job as running in the HA services, so that the
+ // JobManager leader will recognize that it as work to do
+ try {
+ haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Could not register the job at the high-availability services", t);
+ }
+
+ // now the JobManagerRunner
+ return new JobManagerRunner(
+ jobGraph, config,
+ commonRpcService,
+ haServices,
+ this,
+ this);
+ }
+
+ protected void shutdown(ApplicationStatus status, String msg) {
+ synchronized (lock) {
+ if (jobManagerRunner != null) {
+ try {
+ jobManagerRunner.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the JobManagerRunner", tt);
+ }
+ }
+ if (resourceManager != null) {
+ try {
+ resourceManager.shutDownCluster(status, msg);
+ resourceManager.shutDown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the ResourceManager", tt);
+ }
+ }
+ if (commonRpcService != null) {
+ try {
+ commonRpcService.stopService();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down resource manager rpc service", tt);
+ }
+ }
+ if (haServices != null) {
+ try {
+ haServices.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the HA service", tt);
+ }
+ }
+ if (metricRegistry != null) {
+ try {
+ metricRegistry.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the metrics registry", tt);
+ }
+ }
+ }
+ }
+
+ private static JobGraph loadJobGraph(Configuration config) throws Exception {
+ JobGraph jg = null;
+ String jobGraphFile = config.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+ if (jobGraphFile != null) {
+ File fp = new File(jobGraphFile);
+ if (fp.isFile()) {
+ FileInputStream input = new FileInputStream(fp);
+ ObjectInputStream obInput = new ObjectInputStream(input);
+ jg = (JobGraph) obInput.readObject();
+ input.close();
+ }
+ }
+ if (jg == null) {
+ throw new Exception("Fail to load job graph " + jobGraphFile);
+ }
+ return jg;
+ }
+
+ //-------------------------------------------------------------------------------------
+ // Fatal error handler
+ //-------------------------------------------------------------------------------------
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ LOG.error("Encountered fatal error.", exception);
+
+ shutdown(ApplicationStatus.FAILED, exception.getMessage());
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Result and error handling methods
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Job completion notification triggered by JobManager
+ */
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ shutdown(ApplicationStatus.SUCCEEDED, null);
+ }
+
+ /**
+ * Job completion notification triggered by JobManager
+ */
+ @Override
+ public void jobFailed(Throwable cause) {
+ shutdown(ApplicationStatus.FAILED, cause.getMessage());
+ }
+
+ /**
+ * Job completion notification triggered by self
+ */
+ @Override
+ public void jobFinishedByOther() {
+ shutdown(ApplicationStatus.UNKNOWN, null);
+ }
+}