You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/28 13:37:48 UTC

flink git commit: [FLINK-7108] [yarn] Add YARN entry points based on the generic entry point

Repository: flink
Updated Branches:
  refs/heads/master 219ae33d3 -> b1f37efb3


[FLINK-7108] [yarn] Add YARN entry points based on the generic entry point

Add the YarnSesssionClusterEntrypoint and the YarnJobClusterEntrypoint which extend
SessionClusterEntrypoint and JobClusterEntrypoint, respectively.

Add new Yarn session and per-job cluster entry points

Remove old Flip-6 Yarn per job entry point

This closes #4281.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1f37efb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1f37efb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1f37efb

Branch: refs/heads/master
Commit: b1f37efb3b49ad73bf31d0bbf9b1337439de722c
Parents: 219ae33
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 5 17:16:52 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 28 15:36:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/entrypoint/ClusterEntrypoint.java   |  37 +-
 .../entrypoint/JobClusterEntrypoint.java        |  21 +-
 .../entrypoint/SessionClusterEntrypoint.java    |   4 +
 .../StandaloneSessionClusterEntrypoint.java     |  10 +-
 ...bstractYarnFlinkApplicationMasterRunner.java | 220 ------------
 .../flink/yarn/YarnClusterDescriptorV2.java     |   3 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 344 -------------------
 .../yarn/entrypoint/YarnEntrypointUtils.java    | 160 +++++++++
 .../entrypoint/YarnJobClusterEntrypoint.java    | 141 ++++++++
 .../YarnSessionClusterEntrypoint.java           | 115 +++++++
 10 files changed, 467 insertions(+), 588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index fa866e4..98348ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
@@ -66,6 +67,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	/** The lock to guard startup / shutdown / manipulation methods. */
 	private final Object lock = new Object();
 
+	private final Configuration configuration;
+
 	@GuardedBy("lock")
 	private MetricRegistry metricRegistry = null;
 
@@ -81,10 +84,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	@GuardedBy("lock")
 	private RpcService commonRpcService = null;
 
-	protected void startCluster(String[] args) {
-		final ClusterConfiguration clusterConfiguration = parseArguments(args);
+	protected ClusterEntrypoint(Configuration configuration) {
+		this.configuration = Preconditions.checkNotNull(configuration);
+	}
 
-		final Configuration configuration = loadConfiguration(clusterConfiguration);
+	protected void startCluster() {
+		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
 			SecurityContext securityContext = installSecurityContext(configuration);
@@ -110,19 +115,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		}
 	}
 
-	protected ClusterConfiguration parseArguments(String[] args) {
-		ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-		final String configDir = parameterTool.get("configDir", "");
-
-		return new ClusterConfiguration(configDir);
-	}
-
-	protected Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
-		return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir());
-	}
-
 	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
+		LOG.info("Install security context.");
+
 		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(configuration));
 
 		return SecurityUtils.getInstalledContext();
@@ -244,4 +239,16 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		BlobServer blobServer,
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry) throws Exception;
+
+	protected static ClusterConfiguration parseArguments(String[] args) {
+		ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		final String configDir = parameterTool.get("configDir", "");
+
+		return new ClusterConfiguration(configDir);
+	}
+
+	protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
+		return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 4133f07..8728186 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -45,6 +46,10 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 	private JobManagerRunner jobManagerRunner;
 
+	public JobClusterEntrypoint(Configuration configuration) {
+		super(configuration);
+	}
+
 	@Override
 	protected void startClusterComponents(
 			Configuration configuration,
@@ -60,7 +65,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			rpcService,
 			highAvailabilityServices,
 			heartbeatServices,
-			metricRegistry);
+			metricRegistry,
+			this);
 
 		jobManagerRunner = createJobManagerRunner(
 			configuration,
@@ -69,7 +75,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			highAvailabilityServices,
 			blobServer,
 			heartbeatServices,
-			metricRegistry);
+			metricRegistry,
+			this);
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
@@ -85,7 +92,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			HighAvailabilityServices highAvailabilityServices,
 			BlobService blobService,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
 
 		JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -99,7 +107,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			heartbeatServices,
 			metricRegistry,
 			new TerminatingOnCompleteActions(jobGraph.getJobID()),
-			this);
+			fatalErrorHandler);
 	}
 
 	@Override
@@ -149,9 +157,10 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry) throws Exception;
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
 
-	protected abstract JobGraph retrieveJobGraph(Configuration configuration);
+	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
 
 	private final class TerminatingOnCompleteActions implements OnCompletionActions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 4c7df1b..4013e83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -41,6 +41,10 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 	private Dispatcher dispatcher;
 
+	public SessionClusterEntrypoint(Configuration configuration) {
+		super(configuration);
+	}
+
 	@Override
 	protected void startClusterComponents(
 			Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index 025f128..6748b61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -37,6 +37,10 @@ import org.apache.flink.runtime.rpc.RpcService;
  */
 public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
+	public StandaloneSessionClusterEntrypoint(Configuration configuration) {
+		super(configuration);
+	}
+
 	@Override
 	protected ResourceManager<?> createResourceManager(
 			Configuration configuration,
@@ -67,8 +71,10 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 	}
 
 	public static void main(String[] args) {
-		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint();
+		Configuration configuration = loadConfiguration(parseArguments(args));
+
+		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
 
-		entrypoint.startCluster(args);
+		entrypoint.startCluster();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
deleted file mode 100644
index 85b51a3..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
-
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- * This class is the executable entry point for the YARN application master.
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster}
- * and {@link YarnResourceManager}.
- *
- * <p>The JobMasters handles Flink job execution, while the YarnResourceManager handles container
- * allocation and failure detection.
- */
-public abstract class AbstractYarnFlinkApplicationMasterRunner {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
-
-	/** The process environment variables. */
-	protected static final Map<String, String> ENV = System.getenv();
-
-	/** The exit code returned if the initialization of the application master failed. */
-	protected static final int INIT_ERROR_EXIT_CODE = 31;
-
-	/** The host name passed by env. */
-	protected String appMasterHostname;
-
-	/**
-	 * The instance entry point for the YARN application master. Obtains user group
-	 * information and calls the main work method {@link #runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
-	 * privileged action.
-	 *
-	 * @param args The command line arguments.
-	 * @return The process exit code.
-	 */
-	protected int run(String[] args) {
-		try {
-			LOG.debug("All environment variables: {}", ENV);
-
-			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-			Preconditions.checkArgument(yarnClientUsername != null, "YARN client user name environment variable {} not set",
-				YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
-			final String currDir = ENV.get(Environment.PWD.key());
-			Preconditions.checkArgument(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
-			LOG.debug("Current working directory: {}", currDir);
-
-			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
-			LOG.debug("Remote keytab path obtained {}", remoteKeytabPath);
-
-			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-			LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal);
-
-			String keytabPath = null;
-			if (remoteKeytabPath != null) {
-				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-				keytabPath = f.getAbsolutePath();
-				LOG.debug("Keytab path: {}", keytabPath);
-			}
-
-			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-
-			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
-					currentUser.getShortUserName(), yarnClientUsername);
-
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
-
-			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
-			if (keytabPath != null && remoteKeytabPrincipal != null) {
-				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
-				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
-			}
-
-			org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
-
-			//To support Yarn Secure Integration Test Scenario
-			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
-			if (krb5Conf.exists() && krb5Conf.canRead()) {
-				String krb5Path = krb5Conf.getAbsolutePath();
-				LOG.info("KRB5 Conf: {}", krb5Path);
-				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-			}
-
-			SecurityUtils.SecurityConfiguration sc;
-			if (hadoopConfiguration != null) {
-				sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
-			} else {
-				sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
-			}
-
-			SecurityUtils.install(sc);
-
-			// Note that we use the "appMasterHostname" given by YARN here, to make sure
-			// we use the hostnames given by YARN consistently throughout akka.
-			// for akka "localhost" and "localhost.localdomain" are different actors.
-			this.appMasterHostname = ENV.get(Environment.NM_HOST.key());
-			Preconditions.checkArgument(appMasterHostname != null,
-					"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
-			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
-
-			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
-				@Override
-				public Integer call() throws Exception {
-					return runApplicationMaster(flinkConfig);
-				}
-			});
-
-		}
-		catch (Throwable t) {
-			// make sure that everything whatever ends up in the log
-			LOG.error("YARN Application Master initialization failed", t);
-			return INIT_ERROR_EXIT_CODE;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Core work method
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The main work method, must run as a privileged action.
-	 *
-	 * @return The return code for the Java process.
-	 */
-	protected abstract int runApplicationMaster(Configuration config);
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	/**
-	 * @param baseDirectory  The working directory
-	 * @param additional Additional parameters
-	 *
-	 * @return The configuration to be used by the TaskExecutors.
-	 */
-	private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
-		LOG.info("Loading config from directory {}.", baseDirectory);
-
-		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
-
-		// add dynamic properties to JobManager configuration.
-		for (Map.Entry<String, String> property : additional.entrySet()) {
-			configuration.setString(property.getKey(), property.getValue());
-		}
-
-		// override zookeeper namespace with user cli argument (if provided)
-		String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
-		if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
-			configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
-		}
-
-		// if a web monitor shall be started, set the port to random binding
-		if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
-			configuration.setInteger(JobManagerOptions.WEB_PORT, 0);
-		}
-
-		// if the user has set the deprecated YARN-specific config keys, we add the
-		// corresponding generic config keys instead. that way, later code needs not
-		// deal with deprecated config keys
-
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
-		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
-			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
-
-		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
-			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
-
-		return configuration;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index 6a43374..dd60f53 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
 /**
  * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
@@ -35,7 +36,7 @@ public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
 
 	@Override
 	protected Class<?> getApplicationMasterClass() {
-		return YarnFlinkApplicationMasterRunner.class;
+		return YarnSessionClusterEntrypoint.class;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
deleted file mode 100644
index 1f5af17..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-
-import akka.actor.ActorSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * This class is the executable entry point for the YARN Application Master that
- * executes a single Flink job and then shuts the YARN application down.
- *
- * <p>The lifetime of the YARN application bound to that of the Flink job. Other
- * YARN Application Master implementations are for example the YARN session.
- *
- * <p>It starts actor system and the actors for {@link JobManagerRunner}
- * and {@link YarnResourceManager}.
- *
- * <p>The JobManagerRunner start a {@link JobMaster} JobMaster handles Flink job
- * execution, while the YarnResourceManager handles container allocation and failure
- * detection.
- */
-public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
-		implements OnCompletionActions, FatalErrorHandler {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
-
-	/** The job graph file path. */
-	private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
-	// ------------------------------------------------------------------------
-
-	/** The lock to guard startup / shutdown / manipulation methods. */
-	private final Object lock = new Object();
-
-	@GuardedBy("lock")
-	private MetricRegistry metricRegistry;
-
-	@GuardedBy("lock")
-	private HighAvailabilityServices haServices;
-
-	@GuardedBy("lock")
-	private HeartbeatServices heartbeatServices;
-
-	@GuardedBy("lock")
-	private RpcService commonRpcService;
-
-	@GuardedBy("lock")
-	private BlobServer blobServer;
-
-	@GuardedBy("lock")
-	private ResourceManager resourceManager;
-
-	@GuardedBy("lock")
-	private JobManagerRunner jobManagerRunner;
-
-	// ------------------------------------------------------------------------
-	//  Program entry point
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The entry point for the YARN application master.
-	 *
-	 * @param args The command line arguments.
-	 */
-	public static void main(String[] args) {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
-		SignalHandler.register(LOG);
-		JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-		// run and exit with the proper return code
-		int returnCode = new YarnFlinkApplicationMasterRunner().run(args);
-		System.exit(returnCode);
-	}
-
-	@Override
-	protected int runApplicationMaster(Configuration config) {
-
-		try {
-			// ---- (1) create common services
-
-			// try to start the rpc service
-			// using the port range definition from the config.
-			final String amPortRange = config.getString(
-					ConfigConstants.YARN_APPLICATION_MASTER_PORT,
-					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
-
-			synchronized (lock) {
-				LOG.info("Starting High Availability Services");
-				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
-
-				haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-					config,
-					commonRpcService.getExecutor(),
-					HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-				blobServer = new BlobServer(config, haServices.createBlobStore());
-
-				heartbeatServices = HeartbeatServices.fromConfiguration(config);
-
-				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-				// ---- (2) init resource manager -------
-				resourceManager = createResourceManager(config);
-
-				// ---- (3) init job master parameters
-				jobManagerRunner = createJobManagerRunner(config);
-
-				// ---- (4) start the resource manager  and job manager runner:
-				resourceManager.start();
-				LOG.debug("YARN Flink Resource Manager started");
-
-				jobManagerRunner.start();
-				LOG.debug("Job Manager Runner started");
-
-				// ---- (5) start the web monitor
-				// TODO: add web monitor
-			}
-
-			// wait for resource manager to finish
-			resourceManager.getTerminationFuture().get();
-			// everything started, we can wait until all is done or the process is killed
-			LOG.info("YARN Application Master finished");
-		}
-		catch (Throwable t) {
-			// make sure that everything whatever ends up in the log
-			LOG.error("YARN Application Master initialization failed", t);
-			shutdown(ApplicationStatus.FAILED, t.getMessage());
-			return INIT_ERROR_EXIT_CODE;
-		}
-
-		return 0;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	protected RpcService createRpcService(
-			Configuration configuration,
-			String bindAddress,
-			String portRange) throws Exception{
-		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
-		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
-	}
-
-	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
-		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			haServices,
-			commonRpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			commonRpcService,
-			FlinkResourceManager.RESOURCE_MANAGER_NAME,
-			ResourceID.generate(),
-			config,
-			ENV,
-			resourceManagerConfiguration,
-			haServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			this);
-	}
-
-	private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{
-		// first get JobGraph from local resources
-		//TODO: generate the job graph from user's jar
-		JobGraph jobGraph = loadJobGraph(config);
-
-		// now the JobManagerRunner
-		return new JobManagerRunner(
-			ResourceID.generate(),
-			jobGraph,
-			config,
-			commonRpcService,
-			haServices,
-			blobServer,
-			heartbeatServices,
-			this,
-			this);
-	}
-
-	protected void shutdown(ApplicationStatus status, String msg) {
-
-		synchronized (lock) {
-			if (jobManagerRunner != null) {
-				try {
-					jobManagerRunner.shutdown();
-				} catch (Throwable tt) {
-					LOG.warn("Failed to stop the JobManagerRunner", tt);
-				}
-			}
-			if (resourceManager != null) {
-				try {
-					resourceManager.shutDownCluster(status, msg);
-					resourceManager.shutDown();
-				} catch (Throwable tt) {
-					LOG.warn("Failed to stop the ResourceManager", tt);
-				}
-			}
-			if (commonRpcService != null) {
-				try {
-					commonRpcService.stopService();
-				} catch (Throwable tt) {
-					LOG.error("Error shutting down resource manager rpc service", tt);
-				}
-			}
-			if (haServices != null) {
-				try {
-					haServices.close();
-				} catch (Throwable tt) {
-					LOG.warn("Failed to stop the HA service", tt);
-				}
-			}
-			if (metricRegistry != null) {
-				try {
-					metricRegistry.shutdown();
-				} catch (Throwable tt) {
-					LOG.warn("Failed to stop the metrics registry", tt);
-				}
-			}
-		}
-	}
-
-	private static JobGraph loadJobGraph(Configuration config) throws Exception {
-		JobGraph jg = null;
-		String jobGraphFile = config.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		if (jobGraphFile != null) {
-			File fp = new File(jobGraphFile);
-			if (fp.isFile()) {
-				try (FileInputStream input = new FileInputStream(fp);
-					ObjectInputStream obInput = new ObjectInputStream(input);) {
-					jg = (JobGraph) obInput.readObject();
-				} catch (IOException e) {
-					LOG.warn("Failed to read job graph file", e);
-				}
-			}
-		}
-		if (jg == null) {
-			throw new Exception("Fail to load job graph " + jobGraphFile);
-		}
-		return jg;
-	}
-
-	//-------------------------------------------------------------------------------------
-	// Fatal error handler
-	//-------------------------------------------------------------------------------------
-
-	@Override
-	public void onFatalError(Throwable exception) {
-		LOG.error("Encountered fatal error.", exception);
-
-		shutdown(ApplicationStatus.FAILED, exception.getMessage());
-	}
-
-	//----------------------------------------------------------------------------------------------
-	// Result and error handling methods
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Job completion notification triggered by JobManager.
-	 */
-	@Override
-	public void jobFinished(JobExecutionResult result) {
-		shutdown(ApplicationStatus.SUCCEEDED, null);
-	}
-
-	/**
-	 * Job completion notification triggered by JobManager.
-	 */
-	@Override
-	public void jobFailed(Throwable cause) {
-		shutdown(ApplicationStatus.FAILED, cause.getMessage());
-	}
-
-	/**
-	 * Job completion notification triggered by self.
-	 */
-	@Override
-	public void jobFinishedByOther() {
-		shutdown(ApplicationStatus.UNKNOWN, null);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
new file mode 100644
index 0000000..9ead775
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.Utils;
+import org.apache.flink.yarn.YarnConfigKeys;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and
+ * {@link YarnJobClusterEntrypoint}.
+ */
+public class YarnEntrypointUtils {
+
+	public static SecurityContext installSecurityContext(
+			Configuration configuration,
+			String workingDirectory) throws Exception {
+		org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
+
+		//To support Yarn Secure Integration Test Scenario
+		File krb5Conf = new File(workingDirectory, Utils.KRB5_FILE_NAME);
+		if (krb5Conf.exists() && krb5Conf.canRead()) {
+			hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+		}
+
+		SecurityUtils.SecurityConfiguration sc;
+		if (hadoopConfiguration != null) {
+			sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+		} else {
+			sc = new SecurityUtils.SecurityConfiguration(configuration);
+		}
+
+		SecurityUtils.install(sc);
+
+		return SecurityUtils.getInstalledContext();
+	}
+
+	public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env) {
+		Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory);
+
+		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+
+		final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+
+		final Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(
+			env.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+
+		final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key());
+		Preconditions.checkState(
+			hostname != null,
+			"ApplicationMaster hostname variable %s not set",
+			ApplicationConstants.Environment.NM_HOST.key());
+
+		configuration.setString(JobManagerOptions.ADDRESS, hostname);
+
+		// TODO: Support port ranges for the AM
+//		final String portRange = configuration.getString(
+//			ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+//			ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+		for (Map.Entry<String, String> property : dynamicProperties.entrySet()) {
+			configuration.setString(property.getKey(), property.getValue());
+		}
+
+		if (zooKeeperNamespace != null) {
+			configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zooKeeperNamespace);
+		}
+
+		// if a web monitor shall be started, set the port to random binding
+		if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
+			configuration.setInteger(JobManagerOptions.WEB_PORT, 0);
+		}
+
+		// if the user has set the deprecated YARN-specific config keys, we add the
+		// corresponding generic config keys instead. that way, later code needs not
+		// deal with deprecated config keys
+
+		BootstrapTools.substituteDeprecatedConfigKey(configuration,
+			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+
+		BootstrapTools.substituteDeprecatedConfigKey(configuration,
+			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
+			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+
+		final String keytabPath;
+
+		if (env.get(YarnConfigKeys.KEYTAB_PATH) == null) {
+			keytabPath = null;
+		} else {
+			File f = new File(workingDirectory, Utils.KEYTAB_FILE_NAME);
+			keytabPath = f.getAbsolutePath();
+		}
+
+		if (keytabPath != null && remoteKeytabPrincipal != null) {
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
+		}
+
+		return configuration;
+	}
+
+	public static void logYarnEnvironmentInformation(Map<String, String> env, Logger log) throws IOException {
+		final String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+		Preconditions.checkArgument(
+			yarnClientUsername != null,
+			"YARN client user name environment variable %s not set",
+			YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+		log.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+			currentUser.getShortUserName(), yarnClientUsername);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
new file mode 100644
index 0000000..7eabdd5
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnResourceManager;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point ofr Yarn per-job clusters.
+ */
+public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
+
+	/** The job graph file path. */
+	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+	private final String workingDirectory;
+
+	public YarnJobClusterEntrypoint(
+			Configuration configuration,
+			String workingDirectory) {
+
+		super(configuration);
+		this.workingDirectory = Preconditions.checkNotNull(workingDirectory);
+	}
+
+	@Override
+	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
+		return YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
+	}
+
+	@Override
+	protected ResourceManager<?> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new YarnResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			configuration,
+			System.getenv(),
+			rmConfiguration,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			fatalErrorHandler);
+	}
+
+	@Override
+	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+		File fp = new File(jobGraphFile);
+
+		try (FileInputStream input = new FileInputStream(fp);
+			ObjectInputStream obInput = new ObjectInputStream(input)) {
+
+			return (JobGraph) obInput.readObject();
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Could not find the JobGraph file.", e);
+		} catch (ClassNotFoundException | IOException e) {
+			throw new FlinkException("Could not load the JobGraph from file.", e);
+		}
+	}
+
+	public static void main(String[] args) {
+		Map<String, String> env = System.getenv();
+
+		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
+		Preconditions.checkArgument(
+			workingDirectory != null,
+			"Working directory variable (%s) not set",
+			ApplicationConstants.Environment.PWD.key());
+
+		try {
+			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
+		} catch (IOException e) {
+			LOG.warn("Could not log YARN environment information.", e);
+		}
+
+		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env);
+
+		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(
+			configuration,
+			workingDirectory);
+
+		yarnJobClusterEntrypoint.startCluster();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f37efb/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
new file mode 100644
index 0000000..cbc55a6
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnResourceManager;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Entry point for Yarn session clusters.
+ */
+public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
+
+	private final String workingDirectory;
+
+	public YarnSessionClusterEntrypoint(
+			Configuration configuration,
+			String workingDirectory) {
+		super(configuration);
+		this.workingDirectory = Preconditions.checkNotNull(workingDirectory);
+	}
+
+	@Override
+	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
+		return YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
+	}
+
+	@Override
+	protected ResourceManager<?> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new YarnResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			configuration,
+			System.getenv(),
+			rmConfiguration,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			fatalErrorHandler);
+	}
+
+	public static void main(String[] args) {
+		Map<String, String> env = System.getenv();
+
+		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
+		Preconditions.checkArgument(
+			workingDirectory != null,
+			"Working directory variable (%s) not set",
+			ApplicationConstants.Environment.PWD.key());
+
+		try {
+			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
+		} catch (IOException e) {
+			LOG.warn("Could not log YARN environment information.", e);
+		}
+
+		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env);
+
+		YarnSessionClusterEntrypoint yarnSessionClusterEntrypoint = new YarnSessionClusterEntrypoint(
+			configuration,
+			workingDirectory);
+
+		yarnSessionClusterEntrypoint.startCluster();
+	}
+}