You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/11/25 13:59:29 UTC

flink git commit: [FLINK-5055][security] skip Hadoop UGI login if unsecured

Repository: flink
Updated Branches:
  refs/heads/master 870e219d9 -> 1b2f3c06d


[FLINK-5055][security] skip Hadoop UGI login if unsecured

The new Kerberos authentication code in Flink assumed that it's running
against vanilla Hadoop. Original Hadoop's behavior is to skip a secure
login if security is not configured. This is different for other
distributions, e.g. the MapR Hadoop distribution of Hadoop.

Thus, we need to make sure we don't perform any login action if security
is not configured.

This also performs minor code cleanup.

This closes #2864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b2f3c06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b2f3c06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b2f3c06

Branch: refs/heads/master
Commit: 1b2f3c06d2296d5e628b66a3c13126a546958d74
Parents: 870e219
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Nov 24 17:12:39 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Nov 25 14:58:20 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  11 +-
 .../runtime/security/HadoopSecurityContext.java |  49 ++++
 .../runtime/security/NoOpSecurityContext.java   |  32 +++
 .../flink/runtime/security/SecurityContext.java | 283 +------------------
 .../flink/runtime/security/SecurityUtils.java   | 267 ++++++++++++++---
 .../flink/runtime/jobmanager/JobManager.scala   |  12 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  11 +-
 .../runtime/security/SecurityContextTest.java   |  77 -----
 .../runtime/security/SecurityUtilsTest.java     |  78 +++++
 .../connectors/fs/RollingSinkSecuredITCase.java |   5 +-
 .../flink/test/util/SecureTestEnvironment.java  |   7 +-
 .../flink/test/util/TestingSecurityContext.java |   8 +-
 .../yarn/YARNSessionFIFOSecuredITCase.java      |  13 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   4 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  33 +--
 .../flink/yarn/YarnTaskManagerRunner.java       |  11 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   9 +-
 17 files changed, 464 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 063cc83..1ec0674 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -97,6 +97,7 @@ import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
@@ -1111,11 +1112,11 @@ public class CliFrontend {
 
 		try {
 			final CliFrontend cli = new CliFrontend();
-			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(cli.config));
-			int retCode = SecurityContext.getInstalled()
-					.runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			SecurityUtils.install(new SecurityUtils.SecurityConfiguration(cli.config));
+			int retCode = SecurityUtils.getInstalledContext()
+					.runSecured(new Callable<Integer>() {
 						@Override
-						public Integer run() {
+						public Integer call() {
 							return cli.parseParameters(args);
 						}
 					});

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
new file mode 100644
index 0000000..ea6e5e3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+
+/*
+ * Process-wide security context object which initializes UGI with appropriate security credentials and also it
+ * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
+ * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+ */
+class HadoopSecurityContext implements SecurityContext {
+
+	private UserGroupInformation ugi;
+
+	HadoopSecurityContext(UserGroupInformation ugi) {
+		this.ugi = Preconditions.checkNotNull(ugi, "UGI passed cannot be null");
+	}
+
+	public <T> T runSecured(final Callable<T> securedCallable) throws Exception {
+		return ugi.doAs(new PrivilegedExceptionAction<T>() {
+			@Override
+			public T run() throws Exception {
+				return securedCallable.call();
+			}
+		});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
new file mode 100644
index 0000000..4574db5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A security context that simply runs a Callable without performing a login action.
+ */
+class NoOpSecurityContext implements SecurityContext {
+
+	@Override
+	public <T> T runSecured(Callable<T> securedCallable) throws Exception {
+		return securedCallable.call();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index f1f9533..02892d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -15,288 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.runtime.security;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
+import java.util.concurrent.Callable;
 
-/*
- * Process-wide security context object which initializes UGI with appropriate security credentials and also it
- * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
- * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+/**
+ * A security context with may be required to run a Callable.
  */
-@Internal
-public class SecurityContext {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class);
-
-	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
-
-	private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
-
-	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
-
-	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
-
-	private static SecurityContext installedContext;
-
-	public static SecurityContext getInstalled() { return installedContext; }
-
-	private UserGroupInformation ugi;
-
-	SecurityContext(UserGroupInformation ugi) {
-		if(ugi == null) {
-			throw new RuntimeException("UGI passed cannot be null");
-		}
-		this.ugi = ugi;
-	}
-
-	public <T> T runSecured(final FlinkSecuredRunner<T> runner) throws Exception {
-		return ugi.doAs(new PrivilegedExceptionAction<T>() {
-			@Override
-			public T run() throws Exception {
-				return runner.run();
-			}
-		});
-	}
-
-	public static void install(SecurityConfiguration config) throws Exception {
-
-		// perform static initialization of UGI, JAAS
-		if(installedContext != null) {
-			LOG.warn("overriding previous security context");
-		}
-
-		// establish the JAAS config
-		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
-		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-		populateSystemSecurityProperties(config.flinkConf);
-
-		// establish the UGI login user
-		UserGroupInformation.setConfiguration(config.hadoopConf);
-
-		UserGroupInformation loginUser;
-
-		if(UserGroupInformation.isSecurityEnabled() &&
-				config.keytab != null && !StringUtils.isBlank(config.principal)) {
-			String keytabPath = (new File(config.keytab)).getAbsolutePath();
-
-			UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-			loginUser = UserGroupInformation.getLoginUser();
-
-			// supplement with any available tokens
-			String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-			if(fileLocation != null) {
-				/*
-				 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
-				 * used in the context of reading the stored tokens from UGI.
-				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-				 * loginUser.addCredentials(cred);
-				*/
-				try {
-					Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-							File.class, org.apache.hadoop.conf.Configuration.class);
-					Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null,new File(fileLocation),
-							config.hadoopConf);
-					Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
-							Credentials.class);
-					addCredentialsMethod.invoke(loginUser,cred);
-				} catch(NoSuchMethodException e) {
-					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
-				}
-			}
-		} else {
-			// login with current user credentials (e.g. ticket cache)
-			try {
-				//Use reflection API to get the login user object
-				//UserGroupInformation.loginUserFromSubject(null);
-				Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-				Subject subject = null;
-				loginUserFromSubjectMethod.invoke(null,subject);
-			} catch(NoSuchMethodException e) {
-				LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
-			}
-
-			loginUser = UserGroupInformation.getLoginUser();
-			// note that the stored tokens are read automatically
-		}
-
-		boolean delegationToken = false;
-		final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
-		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-		for(Token<? extends TokenIdentifier> token : usrTok) {
-			final Text id = new Text(token.getIdentifier());
-			LOG.debug("Found user token " + id + " with " + token);
-			if(token.getKind().equals(HDFS_DELEGATION_KIND)) {
-				delegationToken = true;
-			}
-		}
-
-		if(UserGroupInformation.isSecurityEnabled() && !loginUser.hasKerberosCredentials()) {
-			//throw an error in non-yarn deployment if kerberos cache is not available
-			if(!delegationToken) {
-				LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
-				throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
-			}
-		}
-
-		installedContext = new SecurityContext(loginUser);
-	}
-
-	/*
-	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
-	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
-	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-	 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
-	 * Kafka current code behavior.
-	 */
-	private static void populateSystemSecurityProperties(Configuration configuration) {
-		Preconditions.checkNotNull(configuration, "The supplied configuation was null");
-
-		//required to be empty for Kafka but we will override the property
-		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
-
-		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
-
-		if (disableSaslClient) {
-			LOG.info("SASL client auth for ZK will be disabled");
-			//SASL auth is disabled by default but will be enabled if specified in configuration
-			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
-			return;
-		}
-
-		// load Jaas config file to initialize SASL
-		final File jaasConfFile;
-		try {
-			Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
-			InputStream jaasConfStream = SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
-			Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
-			jaasConfFile = jaasConfPath.toFile();
-			jaasConfFile.deleteOnExit();
-			jaasConfStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
-				"locate pseudo Jaas config provided with Flink", e);
-		}
-
-		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
-				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
-
-		//ZK client module lookup the configuration to handle SASL.
-		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
-		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
-
-		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
-		if (!StringUtils.isBlank(zkSaslServiceName)) {
-			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
-			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
-		}
-
-	}
-
-	/**
-	 * Inputs for establishing the security context.
-	 */
-	public static class SecurityConfiguration {
-
-		Configuration flinkConf;
-
-		org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
-
-		String keytab;
-
-		String principal;
-
-		public SecurityConfiguration() {
-			this.flinkConf = GlobalConfiguration.loadConfiguration();
-		}
-
-		public String getKeytab() {
-			return keytab;
-		}
-
-		public String getPrincipal() {
-			return principal;
-		}
-
-		public SecurityConfiguration setFlinkConfiguration(Configuration flinkConf) {
-
-			this.flinkConf = flinkConf;
-
-			String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-
-			String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-
-			validate(keytab, principal);
-
-			LOG.debug("keytab {} and principal {} .", keytab, principal);
-
-			this.keytab = keytab;
-
-			this.principal = principal;
-
-			return this;
-		}
-
-		public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
-			this.hadoopConf = conf;
-			return this;
-		}
-
-		private void validate(String keytab, String principal) {
-
-			if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
-					!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
-				if(StringUtils.isBlank(keytab)) {
-					LOG.warn("Keytab is null or empty");
-				}
-				if(StringUtils.isBlank(principal)) {
-					LOG.warn("Principal is null or empty");
-				}
-				throw new RuntimeException("Requires both keytab and principal to be provided");
-			}
-
-			if(!StringUtils.isBlank(keytab)) {
-				File keytabFile = new File(keytab);
-				if(!keytabFile.exists() || !keytabFile.isFile()) {
-					LOG.warn("Not a valid keytab: {} file", keytab);
-					throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
-				}
-			}
-
-		}
-	}
+public interface SecurityContext {
 
-	public interface FlinkSecuredRunner<T> {
-		T run() throws Exception;
-	}
+	<T> T runSecured(Callable<T> securedCallable) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 8747efe..f6e0a8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -15,57 +15,260 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.security.PrivilegedExceptionAction;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Collection;
 
-/**
- * A utility class that lets program code run in a security context provided by the
- * Hadoop security user groups.
- * 
- * The secure context will for example pick up authentication information from Kerberos.
+/*
+ * Utils for configuring security. The following security mechanism are supported:
+ *
+ * 1. Java Authentication and Authorization Service (JAAS)
+ * 2. Hadoop's User Group Information (UGI)
  */
-public final class SecurityUtils {
+public class SecurityUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
 
-	// load Hadoop configuration when loading the security utils.
-	private static Configuration hdConf = new Configuration();
-	
-	
-	public static boolean isSecurityEnabled() {
-		UserGroupInformation.setConfiguration(hdConf);
-		return UserGroupInformation.isSecurityEnabled();
-	}
+	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
+
+	private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
+
+	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	private static SecurityContext installedContext = new NoOpSecurityContext();
 
-	public static <T> T runSecured(final FlinkSecuredRunner<T> runner) throws Exception {
-		UserGroupInformation.setConfiguration(hdConf);
-		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-		if (!ugi.hasKerberosCredentials()) {
-			LOG.error("Security is enabled but no Kerberos credentials have been found. " +
-						"You may authenticate using the kinit command.");
+	public static SecurityContext getInstalledContext() { return installedContext; }
+
+	/**
+	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism
+	 */
+	public static void install(SecurityConfiguration config) throws Exception {
+
+		if (!(installedContext instanceof NoOpSecurityContext)) {
+			LOG.warn("overriding previous security context");
 		}
-		return ugi.doAs(new PrivilegedExceptionAction<T>() {
-			@Override
-			public T run() throws Exception {
-				return runner.run();
+
+		// establish the JAAS config
+		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
+		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+
+		populateSystemSecurityProperties(config.flinkConf);
+
+		// establish the UGI login user
+		UserGroupInformation.setConfiguration(config.hadoopConf);
+
+		// only configure Hadoop security if we have security enabled
+		if (UserGroupInformation.isSecurityEnabled()) {
+
+			final UserGroupInformation loginUser;
+
+			if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
+				String keytabPath = (new File(config.keytab)).getAbsolutePath();
+
+				UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
+
+				loginUser = UserGroupInformation.getLoginUser();
+
+				// supplement with any available tokens
+				String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+				if (fileLocation != null) {
+				/*
+				 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+				 * used in the context of reading the stored tokens from UGI.
+				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+				 * loginUser.addCredentials(cred);
+				*/
+					try {
+						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+							File.class, org.apache.hadoop.conf.Configuration.class);
+						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+							config.hadoopConf);
+						Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+							Credentials.class);
+						addCredentialsMethod.invoke(loginUser, cred);
+					} catch (NoSuchMethodException e) {
+						LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+					}
+				}
+			} else {
+				// login with current user credentials (e.g. ticket cache)
+				try {
+					//Use reflection API to get the login user object
+					//UserGroupInformation.loginUserFromSubject(null);
+					Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+					Subject subject = null;
+					loginUserFromSubjectMethod.invoke(null, subject);
+				} catch (NoSuchMethodException e) {
+					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+				}
+
+				// note that the stored tokens are read automatically
+				loginUser = UserGroupInformation.getLoginUser();
+			}
+
+			boolean delegationToken = false;
+			final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
+			Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+			for (Token<? extends TokenIdentifier> token : usrTok) {
+				final Text id = new Text(token.getIdentifier());
+				LOG.debug("Found user token " + id + " with " + token);
+				if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
+					delegationToken = true;
+				}
 			}
-		});
+
+			if (!loginUser.hasKerberosCredentials()) {
+				//throw an error in non-yarn deployment if kerberos cache is not available
+				if (!delegationToken) {
+					LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+					throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+				}
+			}
+
+			installedContext = new HadoopSecurityContext(loginUser);
+		}
 	}
 
-	public interface FlinkSecuredRunner<T> {
-		T run() throws Exception;
+	/*
+	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
+	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+	 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
+	 * Kafka current code behavior.
+	 */
+	private static void populateSystemSecurityProperties(Configuration configuration) {
+		Preconditions.checkNotNull(configuration, "The supplied configuation was null");
+
+		//required to be empty for Kafka but we will override the property
+		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+
+		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+
+		if (disableSaslClient) {
+			LOG.info("SASL client auth for ZK will be disabled");
+			//SASL auth is disabled by default but will be enabled if specified in configuration
+			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
+			return;
+		}
+
+		// load Jaas config file to initialize SASL
+		final File jaasConfFile;
+		try {
+			Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
+			InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
+			Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+			jaasConfFile = jaasConfPath.toFile();
+			jaasConfFile.deleteOnExit();
+			jaasConfStream.close();
+		} catch (IOException e) {
+			throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
+				"locate pseudo Jaas config provided with Flink", e);
+		}
+
+		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
+				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+
+		//ZK client module lookup the configuration to handle SASL.
+		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
+
+		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+		if (!StringUtils.isBlank(zkSaslServiceName)) {
+			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
+			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+		}
+
 	}
 
 	/**
-	 * Private constructor to prevent instantiation.
+	 * Inputs for establishing the security context.
 	 */
-	private SecurityUtils() {
-		throw new RuntimeException();
+	public static class SecurityConfiguration {
+
+		private Configuration flinkConf;
+
+		private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+
+		private String keytab;
+
+		private String principal;
+
+		public String getKeytab() {
+			return keytab;
+		}
+
+		public String getPrincipal() {
+			return principal;
+		}
+
+		public SecurityConfiguration(Configuration flinkConf) {
+			this.flinkConf = flinkConf;
+
+			String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+			String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+			validate(keytab, principal);
+
+			this.keytab = keytab;
+			this.principal = principal;
+		}
+
+		public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
+			this.hadoopConf = conf;
+			return this;
+		}
+
+		private void validate(String keytab, String principal) {
+			LOG.debug("keytab {} and principal {} .", keytab, principal);
+
+			if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
+					!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
+				if(StringUtils.isBlank(keytab)) {
+					LOG.warn("Keytab is null or empty");
+				}
+				if(StringUtils.isBlank(principal)) {
+					LOG.warn("Principal is null or empty");
+				}
+				throw new RuntimeException("Requires both keytab and principal to be provided");
+			}
+
+			if(!StringUtils.isBlank(keytab)) {
+				File keytabFile = new File(keytab);
+				if(!keytabFile.exists() || !keytabFile.isFile()) {
+					LOG.warn("Not a valid keytab: {} file", keytab);
+					throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+				}
+			}
+
+		}
 	}
+
+	// Just a util, shouldn't be instantiated.
+	private SecurityUtils() {}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 197456f..233dbda 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -73,8 +73,8 @@ import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
-import org.apache.flink.runtime.security.SecurityContext
-import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
+import org.apache.flink.runtime.security.SecurityUtils
+import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -1900,6 +1900,7 @@ object JobManager {
     catch {
       case t: Throwable =>
         LOG.error(t.getMessage(), t)
+        t.printStackTrace()
         System.exit(STARTUP_FAILURE_RETURN_CODE)
         null
     }
@@ -1930,11 +1931,11 @@ object JobManager {
     }
 
     // run the job manager
-    SecurityContext.install(new SecurityConfiguration().setFlinkConfiguration(configuration))
+    SecurityUtils.install(new SecurityConfiguration(configuration))
 
     try {
-      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
-        override def run(): Unit = {
+      SecurityUtils.getInstalledContext.runSecured(new Callable[Unit] {
+        override def call(): Unit = {
           runJobManager(
             configuration,
             executionMode,
@@ -1945,6 +1946,7 @@ object JobManager {
     } catch {
       case t: Throwable =>
         LOG.error("Failed to run JobManager.", t)
+        t.printStackTrace()
         System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index dd5d218..a3b1382 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,6 +22,7 @@ import java.io.{File, FileInputStream, IOException}
 import java.lang.management.ManagementFactory
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
+import java.util.concurrent.Callable
 import java.util.{Collections, UUID}
 
 import _root_.akka.actor._
@@ -65,8 +66,8 @@ import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
 import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
-import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
-import org.apache.flink.runtime.security.SecurityContext
+import org.apache.flink.runtime.security.SecurityUtils
+import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{MathUtils, NetUtils}
@@ -1532,11 +1533,11 @@ object TaskManager {
     val resourceId = ResourceID.generate()
 
     // run the TaskManager (if requested in an authentication enabled context)
-    SecurityContext.install(new SecurityConfiguration().setFlinkConfiguration(configuration))
+    SecurityUtils.install(new SecurityConfiguration(configuration))
 
     try {
-      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
-        override def run(): Unit = {
+      SecurityUtils.getInstalledContext.runSecured(new Callable[Unit] {
+        override def call(): Unit = {
           selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
         }
       })

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
deleted file mode 100644
index 3c48e8f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.security;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link SecurityContext}.
- */
-public class SecurityContextTest {
-
-	@Test
-	public void testCreateInsecureHadoopCtx() {
-		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
-		try {
-			SecurityContext.install(sc);
-			assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
-		} catch (Exception e) {
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testInvalidUGIContext() {
-		try {
-			new SecurityContext(null);
-		} catch (RuntimeException re) {
-			assertEquals("UGI passed cannot be null",re.getMessage());
-		}
-	}
-
-
-	private String getOSUserName() throws Exception {
-		String userName = "";
-		String osName = System.getProperty( "os.name" ).toLowerCase();
-		String className = null;
-
-		if( osName.contains( "windows" ) ){
-			className = "com.sun.security.auth.module.NTSystem";
-		}
-		else if( osName.contains( "linux" ) || osName.contains( "mac" )  ){
-			className = "com.sun.security.auth.module.UnixSystem";
-		}
-		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
-			className = "com.sun.security.auth.module.SolarisSystem";
-		}
-
-		if( className != null ){
-			Class<?> c = Class.forName( className );
-			Method method = c.getDeclaredMethod( "getUsername" );
-			Object o = c.newInstance();
-			userName = (String) method.invoke( o );
-		}
-		return userName;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
new file mode 100644
index 0000000..ecb89e0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SecurityUtils}.
+ */
+public class SecurityUtilsTest {
+
+	@Test
+	public void testCreateInsecureHadoopCtx() {
+		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
+		try {
+			SecurityUtils.install(sc);
+			assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testInvalidUGIContext() {
+		try {
+			new HadoopSecurityContext(null);
+		} catch (RuntimeException re) {
+			assertEquals("UGI passed cannot be null",re.getMessage());
+		}
+	}
+
+
+	private String getOSUserName() throws Exception {
+		String userName = "";
+		String osName = System.getProperty( "os.name" ).toLowerCase();
+		String className = null;
+
+		if( osName.contains( "windows" ) ){
+			className = "com.sun.security.auth.module.NTSystem";
+		}
+		else if( osName.contains( "linux" ) || osName.contains( "mac" )  ){
+			className = "com.sun.security.auth.module.UnixSystem";
+		}
+		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
+			className = "com.sun.security.auth.module.SolarisSystem";
+		}
+
+		if( className != null ){
+			Class<?> c = Class.forName( className );
+			Method method = c.getDeclaredMethod( "getUsername" );
+			Object o = c.newInstance();
+			userName = (String) method.invoke( o );
+		}
+		return userName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index c005814..eb12d07 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -121,8 +121,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
-		ctx.setFlinkConfiguration(flinkConfig);
+		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
 		ctx.setHadoopConfiguration(conf);
 		try {
 			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index 0250c16..de715c6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -112,12 +112,11 @@ public class SecureTestEnvironment {
 			//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
 			//See Yarn test case module for reference
 			createJaasConfig(baseDirForSecureRun);
-			SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
 			Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
 			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
 			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
 			flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
-			ctx.setFlinkConfiguration(flinkConfig);
+			SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
 			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
 
 			populateJavaPropertyVariables();
@@ -227,7 +226,7 @@ public class SecureTestEnvironment {
 	 */
 	private static void  createJaasConfig(File baseDirForSecureRun) {
 
-		try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun,SecurityContext.JAAS_CONF_FILENAME), true);
+		try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun, SecurityUtils.JAAS_CONF_FILENAME), true);
 			BufferedWriter bw = new BufferedWriter(fw);
 			PrintWriter out = new PrintWriter(bw))
 		{

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
index 5e84c7e..4343013 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 
 import java.util.Map;
 
@@ -30,11 +30,11 @@ import java.util.Map;
 @Internal
 public class TestingSecurityContext {
 
-	public static void install(SecurityContext.SecurityConfiguration config,
+	public static void install(SecurityUtils.SecurityConfiguration config,
 						Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap)
 			throws Exception {
 
-		SecurityContext.install(config);
+		SecurityUtils.install(config);
 
 		// establish the JAAS config for Test environment
 		TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
@@ -77,4 +77,4 @@ public class TestingSecurityContext {
 
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 0725bf2..45fd8d0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestingSecurityContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -31,6 +31,8 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Callable;
+
 public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
@@ -47,7 +49,7 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 
 		SecureTestEnvironment.prepare(tmp);
 
-		populateYarnSecureConfigurations(yarnConfiguration,SecureTestEnvironment.getHadoopServicePrincipal(),
+		populateYarnSecureConfigurations(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
 				SecureTestEnvironment.getTestKeytab());
 
 		Configuration flinkConfig = new Configuration();
@@ -56,15 +58,14 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
-		ctx.setFlinkConfiguration(flinkConfig);
+		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
 		ctx.setHadoopConfiguration(yarnConfiguration);
 		try {
 			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
 
-			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
 				@Override
-				public Integer run() {
+				public Integer call() {
 					startYARNSecureMode(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
 							SecureTestEnvironment.getTestKeytab());
 					return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 54072e2..ca18439 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -584,7 +584,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		//check if there is a JAAS config file
-		File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityContext.JAAS_CONF_FILENAME);
+		File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityUtils.JAAS_CONF_FILENAME);
 		if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
 			effectiveShipFiles.add(jaasConfigFile);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 8f2cc33..1826d43 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.process.ProcessReaper;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
@@ -71,6 +71,7 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.Collections;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -161,7 +162,18 @@ public class YarnApplicationMasterRunner {
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
 					currentUser.getShortUserName(), yarnClientUsername );
 
-			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+			// Flink configuration
+			final Map<String, String> dynamicProperties =
+				FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+
+			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+			}
+
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
@@ -174,22 +186,11 @@ public class YarnApplicationMasterRunner {
 				sc.setHadoopConfiguration(conf);
 			}
 
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
-
-			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
-			if(keytabPath != null && remoteKeytabPrincipal != null) {
-				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
-			}
-
-			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+			SecurityUtils.install(sc);
 
-			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Integer run() {
+				public Integer call() {
 					return runApplicationMaster(flinkConfig);
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 21ed52e..015eb1b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -21,11 +21,12 @@ package org.apache.flink.yarn;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -110,7 +111,7 @@ public class YarnTaskManagerRunner {
 
 		try {
 
-			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
@@ -128,11 +129,11 @@ public class YarnTaskManagerRunner {
 				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
 
-			SecurityContext.install(sc.setFlinkConfiguration(configuration));
+			SecurityUtils.install(sc);
 
-			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
 				@Override
-				public Integer run() {
+				public Integer call() {
 					try {
 						TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b2f3c06/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 1d10bd9..670f8a2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -33,7 +33,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -61,6 +61,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
 import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
@@ -468,10 +469,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	public static void main(final String[] args) throws Exception {
 		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
 		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-		SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
-		int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfiguration));
+		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 			@Override
-			public Integer run() {
+			public Integer call() {
 				return cli.run(args);
 			}
 		});