You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:44 UTC

[11/50] [abbrv] flink git commit: [FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index f4c2032..848013c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -341,26 +341,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	@Override
 	public YarnClusterClient deploy() {
-
 		try {
-
-			UserGroupInformation.setConfiguration(conf);
-			UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-			if (UserGroupInformation.isSecurityEnabled()) {
-				if (!ugi.hasKerberosCredentials()) {
-					throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
-						"You may use kinit to authenticate and request a TGT from the Kerberos server.");
-				}
-				return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
-					@Override
-					public YarnClusterClient run() throws Exception {
-						return deployInternal();
-					}
-				});
-			} else {
-				return deployInternal();
-			}
+			return deployInternal();
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't deploy Yarn cluster", e);
 		}
@@ -539,9 +521,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		addLibFolderToShipFiles(effectiveShipFiles);
+		//check if there is a JAAS config file
+		File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityContext.JAAS_CONF_FILENAME);
+		if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
+			effectiveShipFiles.add(jaasConfigFile);
+		}
 
-		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j);
+		addLibFolderToShipFiles(effectiveShipFiles);
 
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
@@ -626,8 +612,53 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 		fs.setPermission(sessionFilesDir, permission); // set permission for path.
 
+		//To support Yarn Secure Integration Test Scenario
+		//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
+		//and KRB5 configuration files. We are adding these files as container local resources for the container
+		//applications (JM/TMs) to have proper secure cluster setup
+		Path remoteKrb5Path = null;
+		Path remoteYarnSiteXmlPath = null;
+		boolean hasKrb5 = false;
+		if(System.getenv("IN_TESTS") != null) {
+			String krb5Config = System.getProperty("java.security.krb5.conf");
+			if(krb5Config != null && krb5Config.length() != 0) {
+				File krb5 = new File(krb5Config);
+				LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
+				LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
+				Path krb5ConfPath = new Path(krb5.getAbsolutePath());
+				remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
+				localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+
+				File f = new File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME);
+				LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
+				LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
+				Path yarnSitePath = new Path(f.getAbsolutePath());
+				remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
+				localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+
+				hasKrb5 = true;
+			}
+		}
+
 		// setup security tokens
-		Utils.setTokensFor(amContainer, paths, conf);
+		LocalResource keytabResource = null;
+		Path remotePathKeytab = null;
+		String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+		if(keytab != null) {
+			LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(keytab);
+			remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
+			localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+		}
+
+		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
+
+		if ( UserGroupInformation.isSecurityEnabled() && keytab == null ) {
+			//set tokens only when keytab is not provided
+			LOG.info("Adding delegation token to the AM container..");
+			Utils.setTokensFor(amContainer, paths, conf);
+		}
 
 		amContainer.setLocalResources(localResources);
 		fs.close();
@@ -646,11 +677,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
-		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
 		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
 		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
 		appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
 
+		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
+		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(keytabResource != null) {
+			appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
+			String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+			appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
+			appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString() );
+		}
+
 		if(dynamicPropertiesEncoded != null) {
 			appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
 		}
@@ -700,6 +745,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
 			}
 			YarnApplicationState appState = report.getYarnApplicationState();
+			LOG.debug("Application State: {}", appState);
 			switch(appState) {
 				case FAILED:
 				case FINISHED:
@@ -996,7 +1042,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
-	protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) {
+	protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback,
+																	boolean hasLog4j,
+																	boolean hasKrb5) {
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
@@ -1021,6 +1069,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
+		//applicable only for YarnMiniCluster secure test run
+		//krb5.conf file will be available as local resource in JM/TM container
+		if(hasKrb5) {
+			amCommand += " -Djava.security.krb5.conf=krb5.conf";
+		}
+
 		amCommand += " " + getApplicationMasterClass().getName() + " "
 			+ " 1>"
 			+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 1496d61..94d4582 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -60,6 +60,14 @@ public final class Utils {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
+	/** Keytab file name populated in YARN container */
+	public static final String KEYTAB_FILE_NAME = "krb5.keytab";
+
+	/** KRB5 file name populated in YARN container for secure IT run */
+	public static final String KRB5_FILE_NAME = "krb5.conf";
+
+	/** Yarn site xml file name populated in YARN container for secure IT run */
+	public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
 	/**
 	 * See documentation

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 6619633..efb658a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -40,11 +41,11 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -60,11 +61,10 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.UUID;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -117,7 +117,7 @@ public class YarnApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the YARN application master. Obtains user group
-	 * information and calls the main work method {@link #runApplicationMaster()} as a
+	 * information and calls the main work method {@link #runApplicationMaster(Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
@@ -127,34 +127,66 @@ public class YarnApplicationMasterRunner {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
 			require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
-				YarnConfigKeys.ENV_CLIENT_USERNAME);
+				YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
-			final UserGroupInformation currentUser;
-			try {
-				currentUser = UserGroupInformation.getCurrentUser();
-			} catch (Throwable t) {
-				throw new Exception("Cannot access UserGroupInformation information for current user", t);
+			final String currDir = ENV.get(Environment.PWD.key());
+			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+			LOG.debug("Current working Directory: {}", currDir);
+
+			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+			LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath);
+
+			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+			LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+			String keytabPath = null;
+			if(remoteKeytabPath != null) {
+				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+				keytabPath = f.getAbsolutePath();
+				LOG.debug("keytabPath: {}", keytabPath);
 			}
 
-			LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}",
-				currentUser.getShortUserName(), yarnClientUsername);
+			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+					currentUser.getShortUserName(), yarnClientUsername );
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if(krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+				sc.setHadoopConfiguration(conf);
+			}
 
-			UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+			// Flink configuration
+			final Map<String, String> dynamicProperties =
+					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
 
-			// transfer all security tokens, for example for authenticated HDFS and HBase access
-			for (Token<?> token : currentUser.getTokens()) {
-				ugi.addToken(token);
+			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
+			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
-			// run the actual work in a secured privileged action
-			return ugi.doAs(new PrivilegedAction<Integer>() {
+			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
 				@Override
 				public Integer run() {
-					return runApplicationMaster();
+					return runApplicationMaster(flinkConfig);
 				}
 			});
+
 		}
 		catch (Throwable t) {
 			// make sure that everything whatever ends up in the log
@@ -172,7 +204,7 @@ public class YarnApplicationMasterRunner {
 	 *
 	 * @return The return code for the Java process.
 	 */
-	protected int runApplicationMaster() {
+	protected int runApplicationMaster(Configuration config) {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 
@@ -194,12 +226,21 @@ public class YarnApplicationMasterRunner {
 
 			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
 
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-				FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+			//Update keytab and principal path to reflect YARN container path location
+			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
 
-			final Configuration config = createConfiguration(currDir, dynamicProperties);
+			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+
+			String keytabPath = null;
+			if(remoteKeytabPath != null) {
+				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+				keytabPath = f.getAbsolutePath();
+				LOG.info("keytabPath: {}", keytabPath);
+			}
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+			}
 
 			// Hadoop/Yarn configuration (loads config data automatically from classpath files)
 			final YarnConfiguration yarnConfig = new YarnConfiguration();
@@ -523,8 +564,20 @@ public class YarnApplicationMasterRunner {
 		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
 		require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
 
-		String yarnClientUsername = env.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
-		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
+		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+		LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+		LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath);
+
+		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+		LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
 
 		String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
 		require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
@@ -537,6 +590,33 @@ public class YarnApplicationMasterRunner {
 			throw new Exception("Could not access YARN's default file system", e);
 		}
 
+		//register keytab
+		LocalResource keytabResource = null;
+		if(remoteKeytabPath != null) {
+			LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(remoteKeytabPath);
+			Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		LocalResource yarnConfResource = null;
+		LocalResource krb5ConfResource = null;
+		boolean hasKrb5 = false;
+		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+			LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+			yarnConfResource = Records.newRecord(LocalResource.class);
+			Path yarnConfPath = new Path(remoteYarnConfPath);
+			Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
+
+			LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+			krb5ConfResource = Records.newRecord(LocalResource.class);
+			Path krb5ConfPath = new Path(remoteKrb5Path);
+			Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
+
+			hasKrb5 = true;
+		}
+
 		// register Flink Jar with remote HDFS
 		LocalResource flinkJar = Records.newRecord(LocalResource.class);
 		{
@@ -563,6 +643,16 @@ public class YarnApplicationMasterRunner {
 		taskManagerLocalResources.put("flink.jar", flinkJar);
 		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
 
+		//To support Yarn Secure Integration Test Scenario
+		if(yarnConfResource != null && krb5ConfResource != null) {
+			taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+			taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+		}
+
+		if(keytabResource != null) {
+			taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+		}
+
 		// prepare additional files to be shipped
 		for (String pathStr : shipListString.split(",")) {
 			if (!pathStr.isEmpty()) {
@@ -582,7 +672,7 @@ public class YarnApplicationMasterRunner {
 
 		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
 			flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-			hasLogback, hasLog4j, taskManagerMainClass);
+			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
 
 		log.info("Starting TaskManagers with command: " + launchCommand);
 
@@ -597,11 +687,17 @@ public class YarnApplicationMasterRunner {
 		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
 		Utils.setupYarnClassPath(yarnConfig, containerEnv);
 
-		containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername);
+		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+		}
 
 		ctx.setEnvironment(containerEnv);
 
 		try (DataOutputBuffer dob = new DataOutputBuffer()) {
+			LOG.debug("Adding security tokens to Task Manager Container launch Context....");
 			UserGroupInformation user = UserGroupInformation.getCurrentUser();
 			Credentials credentials = user.getCredentials();
 			credentials.writeTokenStorageToStream(dob);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index b14d7b7..ada241c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -32,7 +32,6 @@ public class YarnConfigKeys {
 	public final static String ENV_APP_ID = "_APP_ID";
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
 	public static final String ENV_SLOTS = "_SLOTS";
 	public static final String ENV_DETACHED = "_DETACHED";
 	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
@@ -41,8 +40,13 @@ public class YarnConfigKeys {
 
 	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
 
+	public final static String KEYTAB_PATH = "_KEYTAB_PATH";
+	public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+	public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
 	public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";
 
+	public static final String ENV_KRB5_PATH = "_KRB5_PATH";
+	public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH";
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 9638137..c70a30b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.yarn;
 
+import java.io.File;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 
 import org.slf4j.Logger;
@@ -64,8 +64,18 @@ public class YarnTaskManagerRunner {
 
 		// read the environment variables for YARN
 		final Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
 		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+		LOG.info("Current working/local Directory: {}", localDirs);
+
+		final String currDir = envs.get(Environment.PWD.key());
+		LOG.info("Current working Directory: {}", currDir);
+
+		final String remoteKeytabPath = envs.get(YarnConfigKeys.KEYTAB_PATH);
+		LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
 		// configure local directory
 		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
@@ -78,34 +88,66 @@ public class YarnTaskManagerRunner {
 				"specified in the Flink config: " + flinkTempDirs);
 		}
 
-		LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +
-			"' setting user to execute Flink TaskManager to '" + yarnClientUsername + "'");
-
 		// tell akka to die in case of an error
 		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
 
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
+		String keytabPath = null;
+		if(remoteKeytabPath != null) {
+			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+			keytabPath = f.getAbsolutePath();
+			LOG.info("keytabPath: {}", keytabPath);
 		}
 
+		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+				currentUser.getShortUserName(), yarnClientUsername );
+
 		// Infer the resource identifier from the environment variable
 		String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
-				}
-				catch (Throwable t) {
-					LOG.error("Error while starting the TaskManager", t);
-					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-				}
-				return null;
+		try {
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if(krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+				sc.setHadoopConfiguration(conf);
+			}
+
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
-		});
+			configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
+
+			SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					try {
+						TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+					}
+					catch (Throwable t) {
+						LOG.error("Error while starting the TaskManager", t);
+						System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+					}
+					return null;
+				}
+			});
+		} catch(Exception e) {
+			LOG.error("Exception occurred while launching Task Manager. Reason: {}", e);
+			throw new RuntimeException(e);
+		}
+
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 8f02a1c..b5364f0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,11 +24,14 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -460,9 +463,27 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		}
 	}
 
-	public static void main(String[] args) {
-		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
-		System.exit(cli.run(args));
+	public static void main(final String[] args) {
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
+
+		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
+		GlobalConfiguration.loadConfiguration(confDirPath);
+		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+		flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath);
+		try {
+			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+			int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					return cli.run(args);
+				}
+			});
+			System.exit(retCode);
+		} catch(Exception e) {
+			e.printStackTrace();
+			LOG.error("Exception Occured. Reason: {}", e);
+			return;
+		}
 	}
 
 	@Override
@@ -523,6 +544,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		try {
 			return yarnClusterDescriptor.deploy();
 		} catch (Exception e) {
+			LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e);
 			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 02e868e..5b3148a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,13 @@ under the License.
 		<jackson.version>2.7.4</jackson.version>
 		<metrics.version>3.1.0</metrics.version>
 		<junit.version>4.11</junit.version>
+		<!--
+			Keeping the MiniKDC version fixed instead of taking hadoop version dependency
+			to support testing Kafka, ZK etc., modules that does not have Hadoop dependency
+			Starting Hadoop 3, org.apache.kerby will be used instead of MiniKDC. We may have
+			to revisit the impact at that time.
+		-->
+		<minikdc.version>2.7.2</minikdc.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 53379b4..476cee3 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -45,3 +45,6 @@ log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
 # the tests
 log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
 log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka=INFO, console
+log4j.logger.org.I0Itec.zkclient=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread=OFF
\ No newline at end of file