You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/20 20:08:58 UTC

[1/5] flink git commit: [FLINK-3929] additional fixes for keytab security

Repository: flink
Updated Branches:
  refs/heads/master 303f6fee9 -> 68709b087


[FLINK-3929] additional fixes for keytab security

- load flink-jaas.conf from classpath
- avoid using undocumented flink base dir config entry
- enable test cases to run on MacOS
- unify suffix of secure test cases
- fix error logging and reporting

This closes #2275


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68709b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68709b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68709b08

Branch: refs/heads/master
Commit: 68709b087570402cacb7bc3088e0eb35d83c8d32
Parents: 285b6f7
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 20 15:41:35 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  3 -
 .../src/main/flink-bin/conf/flink-jaas.conf     | 26 --------
 .../flink/runtime/security/SecurityContext.java | 67 ++++++++------------
 .../src/main/resources/flink-jaas.conf          | 26 ++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  2 -
 .../runtime/security/SecurityContextTest.java   |  4 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |  1 -
 .../kafka/Kafka09SecureRunITCase.java           | 62 ------------------
 .../kafka/Kafka09SecuredRunITCase.java          | 62 ++++++++++++++++++
 .../flink/test/util/SecureTestEnvironment.java  | 23 +++----
 .../org/apache/flink/yarn/YarnTestBase.java     |  3 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  1 -
 .../flink/yarn/YarnTaskManagerRunner.java       |  5 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 32 +++-------
 14 files changed, 138 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 575ffad..0711758 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -161,9 +161,6 @@ public class CliFrontend {
 				"filesystem scheme from configuration.", e);
 		}
 
-		this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath()
-				+ ".." + File.separator);
-
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
deleted file mode 100644
index d476e24..0000000
--- a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
+++ /dev/null
@@ -1,26 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# We are using this file as an workaround for the Kafka and ZK SASL implementation
-# since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal implementation
-# uses a process-wide in-memory java security configuration object.
-# Please do not edit/delete this file - See FLINK-3929
-sample {
-  useKeyTab=false
-  useTicketCache=true;
-};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 4b8b69b..be6611f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -22,7 +22,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -33,7 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 
@@ -170,15 +176,12 @@ public class SecurityContext {
 	 * Kafka current code behavior.
 	 */
 	private static void populateSystemSecurityProperties(Configuration configuration) {
+		Preconditions.checkNotNull(configuration, "The supplied configuation was null");
 
 		//required to be empty for Kafka but we will override the property
 		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
 		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
 
-		if(configuration == null) {
-			return;
-		}
-
 		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
 				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
 		if(disableSaslClient) {
@@ -188,46 +191,26 @@ public class SecurityContext {
 			return;
 		}
 
-		String baseDir = configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-		if(baseDir == null) {
-			String message = "SASL auth is enabled for ZK but unable to locate pseudo Jaas config " +
-					"since " + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
-			LOG.error(message);
-			throw new IllegalConfigurationException(message);
-		}
-
-		File f = new File(baseDir);
-		if(!f.exists() || !f.isDirectory()) {
-			LOG.error("Invalid flink base directory {} configuration provided", baseDir);
-			throw new IllegalConfigurationException("Invalid flink base directory configuration provided");
-		}
-
-		File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
-
-		if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
-
-			//check if there is a conf directory
-			File confDir = new File(f, "conf");
-			if(!confDir.exists() || !confDir.isDirectory()) {
-				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
-				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
-			}
-
-			jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
-
-			if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
-				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
-				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
-			}
+		// load Jaas config file to initialize SASL
+		final File jaasConfFile;
+		try {
+			Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
+			InputStream jaasConfStream = SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
+			Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+			jaasConfFile = jaasConfPath.toFile();
+			jaasConfFile.deleteOnExit();
+		} catch (IOException e) {
+			throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
+				"locate pseudo Jaas config provided with Flink", e);
 		}
 
 		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
-				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile);
+				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
 
 		//ZK client module lookup the configuration to handle SASL.
 		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile.getAbsolutePath());
-		System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
 
 		String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
 		if(!StringUtils.isBlank(zkSaslServiceName)) {
@@ -250,6 +233,10 @@ public class SecurityContext {
 
 		String principal;
 
+		public SecurityConfiguration() {
+			this.flinkConf = GlobalConfiguration.loadConfiguration();
+		}
+
 		public String getKeytab() {
 			return keytab;
 		}
@@ -310,4 +297,4 @@ public class SecurityContext {
 		T run() throws Exception;
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf
new file mode 100644
index 0000000..7f0f06b
--- /dev/null
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+  useKeyTab=false
+  useTicketCache=true;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8534ee1..9e2feb5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1583,8 +1583,6 @@ object TaskManager {
       }
     }
 
-    conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, cliConfig.getConfigDir() + "/..")
-
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
index 5f3d76a..3c48e8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -35,7 +35,7 @@ public class SecurityContextTest {
 		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
 		try {
 			SecurityContext.install(sc);
-			assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+			assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -59,7 +59,7 @@ public class SecurityContextTest {
 		if( osName.contains( "windows" ) ){
 			className = "com.sun.security.auth.module.NTSystem";
 		}
-		else if( osName.contains( "linux" ) ){
+		else if( osName.contains( "linux" ) || osName.contains( "mac" )  ){
 			className = "com.sun.security.auth.module.UnixSystem";
 		}
 		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 930ddd2..051175a 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -227,7 +227,6 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
 
 		} catch (Exception e) {
-			LOG.error("Exception occured while creating MiniFlink cluster. Reason: {}", e);
 			throw new RuntimeException(e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
deleted file mode 100644
index d12ec65..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
-
-	@BeforeClass
-	public static void prepare() throws IOException, ClassNotFoundException {
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Starting Kafka09SecureRunITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-
-		SecureTestEnvironment.prepare(tempFolder);
-		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
-		startClusters(true);
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-		shutdownClusters();
-		SecureTestEnvironment.cleanup();
-	}
-
-
-	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
-	//The timeout for the test case is 2 times timeout of ZK connection
-	@Test(timeout = 600000)
-	public void testMultipleTopics() throws Exception {
-		runProduceConsumeMultipleTopics();
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+	@BeforeClass
+	public static void prepare() throws IOException, ClassNotFoundException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting Kafka09SecuredRunITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		SecureTestEnvironment.prepare(tempFolder);
+		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+		startClusters(true);
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		shutdownClusters();
+		SecureTestEnvironment.cleanup();
+	}
+
+
+	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
+	//The timeout for the test case is 2 times timeout of ZK connection
+	@Test(timeout = 600000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index 00b19f1..b5e622b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -60,12 +61,10 @@ public class SecureTestEnvironment {
 
 	private static String hadoopServicePrincipal = null;
 
-	private static File baseDirForSecureRun = null;
-
 	public static void prepare(TemporaryFolder tempFolder) {
 
 		try {
-			baseDirForSecureRun = tempFolder.newFolder();
+			File baseDirForSecureRun = tempFolder.newFolder();
 			LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);
 
 			String hostName = "localhost";
@@ -113,19 +112,17 @@ public class SecureTestEnvironment {
 			//See Yarn test case module for reference
 			createJaasConfig(baseDirForSecureRun);
 			SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
-			Configuration flinkConfig = new Configuration();
+			Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
 			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
 			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
 			flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
-			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath());
 			ctx.setFlinkConfiguration(flinkConfig);
 			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
 
-			populateSystemEnvVariables();
+			populateJavaPropertyVariables();
 
 		} catch(Exception e) {
-			LOG.error("Exception occured while preparing secure environment. Reason: {}", e);
-			throw new RuntimeException(e);
+			throw new RuntimeException("Exception occured while preparing secure environment.", e);
 		}
 
 	}
@@ -145,14 +142,12 @@ public class SecureTestEnvironment {
 		testPrincipal = null;
 		testZkServerPrincipal = null;
 		hadoopServicePrincipal = null;
-		baseDirForSecureRun = null;
 
 	}
 
-	private static void populateSystemEnvVariables() {
+	private static void populateJavaPropertyVariables() {
 
 		if(LOG.isDebugEnabled()) {
-			System.setProperty("FLINK_JAAS_DEBUG", "true");
 			System.setProperty("sun.security.krb5.debug", "true");
 		}
 
@@ -165,7 +160,6 @@ public class SecureTestEnvironment {
 
 	private static void resetSystemEnvVariables() {
 		System.clearProperty("java.security.krb5.conf");
-		System.clearProperty("FLINK_JAAS_DEBUG");
 		System.clearProperty("sun.security.krb5.debug");
 
 		System.clearProperty("zookeeper.authProvider.1");
@@ -227,7 +221,7 @@ public class SecureTestEnvironment {
 	}
 
 	/*
-	 * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL
+	 * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL
 	 * implementation lookup java.security.auth.login.config
 	 */
 	private static void  createJaasConfig(File baseDirForSecureRun) {
@@ -241,8 +235,7 @@ public class SecureTestEnvironment {
 			out.println("useTicketCache=true;");
 			out.println("};");
 		} catch (IOException e) {
-			LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage());
-			throw new RuntimeException(e);
+			throw new RuntimeException("Exception occured while trying to create JAAS config.", e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 605aa44..afdd400 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -427,8 +427,7 @@ public abstract class YarnTestBase extends TestLogger {
 					out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
 					out.println("");
 				} catch (IOException e) {
-					LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage());
-					throw new RuntimeException(e);
+					throw new RuntimeException("Exception occured while trying to append the security configurations.", e);
 				}
 
 				String configDir = tempConfPathForSecureRun.getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index efb658a..b27876b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -176,7 +176,6 @@ public class YarnApplicationMasterRunner {
 				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
 				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
-			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
 			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index c70a30b..21ed52e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -127,7 +127,6 @@ public class YarnTaskManagerRunner {
 				configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
 				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
-			configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
 			SecurityContext.install(sc.setFlinkConfiguration(configuration));
 
@@ -145,9 +144,9 @@ public class YarnTaskManagerRunner {
 				}
 			});
 		} catch(Exception e) {
-			LOG.error("Exception occurred while launching Task Manager. Reason: {}", e);
+			LOG.error("Exception occurred while launching Task Manager", e);
 			throw new RuntimeException(e);
 		}
 
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index b5364f0..d09340c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
@@ -463,27 +462,17 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		}
 	}
 
-	public static void main(final String[] args) {
-		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
-
-		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
-		GlobalConfiguration.loadConfiguration(confDirPath);
+	public static void main(final String[] args) throws Exception {
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
 		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-		flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath);
-		try {
-			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
-			int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
-				@Override
-				public Integer run() {
-					return cli.run(args);
-				}
-			});
-			System.exit(retCode);
-		} catch(Exception e) {
-			e.printStackTrace();
-			LOG.error("Exception Occured. Reason: {}", e);
-			return;
-		}
+		SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+		int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			@Override
+			public Integer run() {
+				return cli.run(args);
+			}
+		});
+		System.exit(retCode);
 	}
 
 	@Override
@@ -544,7 +533,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		try {
 			return yarnClusterDescriptor.deploy();
 		} catch (Exception e) {
-			LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e);
 			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
 


[2/5] flink git commit: [FLINK-3929] conditionally skip RollingSinkSecuredITCase

Posted by mx...@apache.org.
[FLINK-3929] conditionally skip RollingSinkSecuredITCase

- for now, we skip this test class until Hadoop version 3.x.x.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/285b6f74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/285b6f74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/285b6f74

Branch: refs/heads/master
Commit: 285b6f74e0416b200229bd67cb7521e4a6871bbc
Parents: 25a622f
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Sep 1 12:49:53 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java | 37 +++++++++++++++-----
 .../connectors/kafka/KafkaTestBase.java         |  8 +++--
 2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 86cedaf..930ddd2 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,9 +31,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.VersionInfo;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,15 +58,31 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 
 /**
  * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
+ * Note: only executed for Hadoop version > 3.x.x
  */
-
-//The test is disabled since MiniDFS secure run requires lower order ports to be used.
-//We can enable the test when the fix is available (HDFS-9213)
-@Ignore
 public class RollingSinkSecuredITCase extends RollingSinkITCase {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
 
+	/**
+	 * Skips all tests if the Hadoop version doesn't match.
+	 * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode
+	 * to bind to non-privileged ports for testing.
+	 * For now, we skip this test class until Hadoop version 3.x.x.
+	 */
+	private static void skipIfHadoopVersionIsNotAppropriate() {
+		// Skips all tests if the Hadoop version doesn't match
+		String hadoopVersionString = VersionInfo.getVersion();
+		String[] split = hadoopVersionString.split("\\.");
+		if (split.length != 3) {
+			throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString);
+		}
+		Assume.assumeTrue(
+			// check whether we're running Hadoop version >= 3.x.x
+			Integer.parseInt(split[0]) >= 3
+		);
+	}
+
 	/*
 	 * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
 	 * and out-of-order sequence for secure cluster
@@ -85,6 +102,8 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 	@BeforeClass
 	public static void startSecureCluster() throws Exception {
 
+		skipIfHadoopVersionIsNotAppropriate();
+
 		LOG.info("starting secure cluster environment for testing");
 
 		dataDir = tempFolder.newFolder();
@@ -143,7 +162,9 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		TestStreamEnvironment.unsetAsContext();
 		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 
-		hdfsCluster.shutdown();
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
 
 		SecureTestEnvironment.cleanup();
 	}
@@ -229,4 +250,4 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 	@Override
 	public void testDateTimeRollingStringWriter() throws Exception {}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index afdd158..5cec4f0 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -110,7 +110,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	}
 
 	protected static Configuration getFlinkConfiguration() {
-		Configuration flinkConfig = new Configuration();;
+		Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -134,7 +134,11 @@ public abstract class KafkaTestBase extends TestLogger {
 
 		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
 
-		if(kafkaServer.isSecureRunSupported() && secureMode) {
+		if (secureMode) {
+			if (!kafkaServer.isSecureRunSupported()) {
+				throw new IllegalStateException(
+					"Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
+			}
 			secureProps = kafkaServer.getSecureProperties();
 		}
 


[5/5] flink git commit: [FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

Posted by mx...@apache.org.
[FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

FLINK-3929 Added MiniKDC support for Kafka, Zookeeper, RollingFS and Yarn integration test modules


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25a622fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25a622fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25a622fd

Branch: refs/heads/master
Commit: 25a622fd9c9255bd1a5b4b6ff7891730dce34ac1
Parents: 303f6fe
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Wed Jul 20 17:08:33 2016 -0700
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 docs/internals/flink_security.md                |  87 ++++++
 docs/setup/config.md                            |  27 +-
 .../org/apache/flink/client/CliFrontend.java    |  38 +--
 .../flink/configuration/ConfigConstants.java    |  22 ++
 .../org/apache/flink/util/Preconditions.java    |   2 +-
 .../src/main/flink-bin/conf/flink-jaas.conf     |  26 ++
 flink-dist/src/main/resources/flink-conf.yaml   |  25 ++
 .../MesosApplicationMasterRunner.java           |   3 +-
 .../clusterframework/BootstrapTools.java        |   7 +
 .../runtime/security/JaasConfiguration.java     | 160 ++++++++++
 .../flink/runtime/security/SecurityContext.java | 313 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  34 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  25 +-
 .../runtime/security/JaasConfigurationTest.java |  52 +++
 .../runtime/security/SecurityContextTest.java   |  77 +++++
 .../flink-connector-filesystem/pom.xml          |  36 +++
 .../connectors/fs/RollingSinkITCase.java        |  19 +-
 .../connectors/fs/RollingSinkSecuredITCase.java | 232 ++++++++++++++
 .../src/test/resources/log4j-test.properties    |   2 +
 .../connectors/kafka/Kafka08ProducerITCase.java |   1 -
 .../kafka/KafkaTestEnvironmentImpl.java         |  13 +-
 .../flink-connector-kafka-0.9/pom.xml           |  18 ++
 .../connectors/kafka/Kafka09ITCase.java         |  14 +-
 .../connectors/kafka/Kafka09ProducerITCase.java |   1 -
 .../kafka/Kafka09SecureRunITCase.java           |  62 ++++
 .../kafka/KafkaTestEnvironmentImpl.java         |  81 ++++-
 .../src/test/resources/log4j-test.properties    |   2 +
 .../flink-connector-kafka-base/pom.xml          |  19 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 100 ++++--
 .../connectors/kafka/KafkaProducerTestBase.java |  14 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  29 +-
 .../connectors/kafka/KafkaTestBase.java         |  72 +++--
 .../connectors/kafka/KafkaTestEnvironment.java  |  11 +-
 .../kafka/testutils/DataGenerators.java         |   9 +-
 .../flink-test-utils/pom.xml                    |  25 ++
 .../util/StreamingMultipleProgramsTestBase.java |   6 +
 .../flink/test/util/SecureTestEnvironment.java  | 249 +++++++++++++++
 .../test/util/TestingJaasConfiguration.java     | 106 +++++++
 .../flink/test/util/TestingSecurityContext.java |  80 +++++
 flink-yarn-tests/pom.xml                        |  20 ++
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  14 +-
 .../YARNSessionCapacitySchedulerITCase.java     |  23 ++
 .../flink/yarn/YARNSessionFIFOITCase.java       |  24 ++
 .../yarn/YARNSessionFIFOSecuredITCase.java      | 103 ++++++
 .../org/apache/flink/yarn/YarnTestBase.java     |  87 +++++-
 .../src/test/resources/log4j-test.properties    |   5 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 104 ++++--
 .../main/java/org/apache/flink/yarn/Utils.java  |   8 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 158 ++++++++--
 .../org/apache/flink/yarn/YarnConfigKeys.java   |   6 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  88 ++++--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  28 +-
 pom.xml                                         |   7 +
 tools/log4j-travis.properties                   |   3 +
 55 files changed, 2553 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
new file mode 100644
index 0000000..846273b
--- /dev/null
+++ b/docs/internals/flink_security.md
@@ -0,0 +1,87 @@
+---
+title:  "Flink Security"
+# Top navigation
+top-nav-group: internals
+top-nav-pos: 10
+top-nav-title: Flink Security
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) 
+and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers 
+who plans to run Flink on a secure environment.
+
+## Objective
+
+The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, 
+streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be  able to authenticate against secure 
+data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
+context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+
+## How Flink Security works
+Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
+A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security 
+requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period 
+of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+
+- Kafka (0.9)
+- HDFS
+- ZooKeeper
+
+Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
+(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+
+Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login 
+module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is 
+instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+
+It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled 
+then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+
+## Security Configurations
+
+Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+
+- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+
+- `security.principal`: User principal name that the Flink cluster should run as.
+
+The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+
+## Standalone Mode:
+
+Steps to run a secure Flink cluster in standalone/cluster mode:
+- Add security configurations to Flink configuration file (on all cluster nodes) 
+- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
+- Deploy Flink cluster using cluster start/stop scripts or CLI
+
+## Yarn Mode:
+
+Steps to run secure Flink cluster in Yarn mode:
+- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI) 
+- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
+- Deploy Flink cluster using CLI
+
+In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
+Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a> 
+
+## Token Renewal
+
+UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 51475cc..54ef394 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -95,18 +95,35 @@ These options are useful for debugging a Flink application for memory and garbag
 
 ### Kerberos
 
-Flink supports Kerberos authentication of Hadoop services such as HDFS, YARN, or HBase.
+Flink supports Kerberos authentication for the following services 
+
++ Hadoop Components: such as HDFS, YARN, or HBase.
++ Kafka Connectors (version 0.9+)
++ Zookeeper Server/Client
+
+Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
 
 **Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
   other versions have critical bugs which might fail the Flink job
   unexpectedly.**
 
+**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+
+> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+
 While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
 
-Please make sure to set the maximum ticket life span high long running jobs. The renewal time of the ticket, on the other hand, is not important because Hadoop abstracts this away using its own security tocken renewal system. Hadoop makes sure that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs. 
 
 If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
 
+> Keytab (security principal and keytab can be configured through Flink configuration file)
+- `security.keytab`: Path to Keytab file
+- `security.principal`: Principal associated with the keytab
+
+Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+
+For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
 
 ### Other
 
@@ -315,6 +332,12 @@ Previously this key was named `recovery.mode` and the default value was `standal
 
 - `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`.
 
+### ZooKeeper-Security
+
+- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to "true" if ZooKeeper cluster is running in secure mode (Kerberos)
+
+- `zookeeper.sasl.service-name`: (Default: `zookeeper`) If the ZooKeeper server is configured with a different service name (default:"zookeeper") then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail. 
+
 ## Environment
 
 - `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. It has to be an absolute path.

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index c7fb647..575ffad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -66,7 +66,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -161,6 +161,9 @@ public class CliFrontend {
 				"filesystem scheme from configuration.", e);
 		}
 
+		this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath()
+				+ ".." + File.separator);
+
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
 	}
 
@@ -982,25 +985,7 @@ public class CliFrontend {
 		// do action
 		switch (action) {
 			case ACTION_RUN:
-				// run() needs to run in a secured environment for the optimizer.
-				if (SecurityUtils.isSecurityEnabled()) {
-					String message = "Secure Hadoop environment setup detected. Running in secure context.";
-					LOG.info(message);
-
-					try {
-						return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
-							@Override
-							public Integer run() throws Exception {
-								return CliFrontend.this.run(params);
-							}
-						});
-					}
-					catch (Exception e) {
-						return handleError(e);
-					}
-				} else {
-					return run(params);
-				}
+				return CliFrontend.this.run(params);
 			case ACTION_LIST:
 				return list(params);
 			case ACTION_INFO:
@@ -1037,12 +1022,19 @@ public class CliFrontend {
 	/**
 	 * Submits the job based on the arguments
 	 */
-	public static void main(String[] args) {
+	public static void main(final String[] args) {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
 
 		try {
-			CliFrontend cli = new CliFrontend();
-			int retCode = cli.parseParameters(args);
+			final CliFrontend cli = new CliFrontend();
+			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(cli.config));
+			int retCode = SecurityContext.getInstalled()
+					.runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+						@Override
+						public Integer run() {
+							return cli.parseParameters(args);
+						}
+					});
 			System.exit(retCode);
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f0f1b6b..9e66e2a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -758,6 +758,12 @@ public final class ConfigConstants {
 	@PublicEvolving
 	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
 
+	@PublicEvolving
+	public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";
+
+	@PublicEvolving
+	public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";
+
 	/** Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
 	@Deprecated
 	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
@@ -1233,6 +1239,9 @@ public final class ConfigConstants {
 	/** ZooKeeper default leader port. */
 	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
+	/** Defaults for ZK client security **/
+	public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
+
 	// ------------------------- Queryable state ------------------------------
 
 	/** Port to bind KvState server to. */
@@ -1279,6 +1288,19 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the location of the lib folder */
 	public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+	// -------------------------------- Security -------------------------------
+
+	/**
+	 * The config parameter defining security credentials required
+	 * for securing Flink cluster.
+	 */
+
+	/** Keytab file key name to be used in flink configuration file */
+	public static final String SECURITY_KEYTAB_KEY = "security.keytab";
+
+	/** Kerberos security principal key name to be used in flink configuration file */
+	public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
+
 
 	/**
 	 * Not instantiable.

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
index ea6b9dd..e970c13 100644
--- a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
+++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
@@ -293,7 +293,7 @@ public final class Preconditions {
 
 		return builder.toString();
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	/** Private constructor to prevent instantiation */

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
new file mode 100644
index 0000000..d476e24
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+  useKeyTab=false
+  useTicketCache=true;
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 27fd84a..c876922 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -139,6 +139,31 @@ jobmanager.web.port: 8081
 #
 # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
 #
+
 # high-availability: zookeeper
 # high-availability.zookeeper.quorum: localhost:2181
 # high-availability.zookeeper.storageDir: hdfs:///flink/ha/
+
+#==============================================================================
+# Flink Cluster Security Configuration (optional configuration)
+#==============================================================================
+
+# Kerberos security for the connectors can be enabled by providing below configurations
+# Security works in two modes - keytab/principal combination or using the Kerberos token cache
+# If keytab and principal are not provided, token cache (manual kinit) will be used
+
+#security.keytab: /path/to/kerberos/keytab
+#security.principal: flink-user
+
+#==============================================================================
+# ZK Security Configuration (optional configuration)
+#==============================================================================
+# Below configurations are applicable if ZK quorum is configured for Kerberos security
+
+# SASL authentication is disabled by default and can be enabled by changig the value to false
+#
+# zookeeper.sasl.disable: true
+
+# Override below configuration to provide custom ZK service name if configured
+#
+# zookeeper.sasl.service-name: zookeeper
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 8fb6af4..5ec39c2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -582,10 +582,11 @@ public class MesosApplicationMasterRunner {
 		// build the launch command
 		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
 		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+		boolean hasKrb5 = false;
 
 		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
 			flinkConfig, tmParams.containeredParameters(), ".", ".",
-			hasLogback, hasLog4j, taskManagerMainClass);
+			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
 		cmd.setValue(launchCommand);
 
 		// build the environment variables

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index c9748cb..d844f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -308,6 +308,7 @@ public class BootstrapTools {
 			String logDirectory,
 			boolean hasLogback,
 			boolean hasLog4j,
+			boolean hasKrb5,
 			Class<?> mainClass) {
 
 		StringBuilder tmCommand = new StringBuilder("$JAVA_HOME/bin/java");
@@ -328,6 +329,12 @@ public class BootstrapTools {
 				tmCommand.append(" -Dlog4j.configuration=file:")
 						.append(configDirectory).append("/log4j.properties");
 			}
+
+			//applicable only for YarnMiniCluster secure test run
+			//krb5.conf file will be available as local resource in JM/TM container
+			if(hasKrb5) {
+				tmCommand.append(" -Djava.security.krb5.conf=krb5.conf");
+			}
 		}
 
 		tmCommand.append(' ').append(mainClass.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
new file mode 100644
index 0000000..c4527dd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * JAAS configuration provider object that provides default LoginModule for various connectors that supports
+ * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class.
+ *
+ * Different connectors uses different login module name to implement JAAS based authentication support.
+ * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the
+ * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right
+ * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides
+ * a standard lookup to get the login module entry for the JAAS based authentication to work.
+ *
+ * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate.
+ *
+ * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a>
+ *
+ */
+
+@Internal
+public class JaasConfiguration extends Configuration {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
+
+	public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
+
+	public static final boolean IBM_JAVA;
+
+	private static final Map<String, String> debugOptions = new HashMap<>();
+
+	private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
+
+	private static final Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+	private static final AppConfigurationEntry userKerberosAce;
+
+	private AppConfigurationEntry keytabKerberosAce = null;
+
+	static {
+
+		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+		if(LOG.isDebugEnabled()) {
+			debugOptions.put("debug", "true");
+		}
+
+		if(IBM_JAVA) {
+			kerberosCacheOptions.put("useDefaultCcache", "true");
+		} else {
+			kerberosCacheOptions.put("doNotPrompt", "true");
+			kerberosCacheOptions.put("useTicketCache", "true");
+		}
+
+		String ticketCache = System.getenv("KRB5CCNAME");
+		if(ticketCache != null) {
+			if(IBM_JAVA) {
+				System.setProperty("KRB5CCNAME", ticketCache);
+			} else {
+				kerberosCacheOptions.put("ticketCache", ticketCache);
+			}
+		}
+
+		kerberosCacheOptions.put("renewTGT", "true");
+		kerberosCacheOptions.putAll(debugOptions);
+
+		userKerberosAce = new AppConfigurationEntry(
+				KerberosUtil.getKrb5LoginModuleName(),
+				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+				kerberosCacheOptions);
+
+	}
+
+	protected JaasConfiguration(String keytab, String principal) {
+
+		LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal);
+
+		if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
+				(!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){
+			throw new RuntimeException("Both keytab and principal are required and cannot be empty");
+		}
+
+		if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
+
+			if(IBM_JAVA) {
+				keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
+				keytabKerberosOptions.put("credsType", "both");
+			} else {
+				keytabKerberosOptions.put("keyTab", keytab);
+				keytabKerberosOptions.put("doNotPrompt", "true");
+				keytabKerberosOptions.put("useKeyTab", "true");
+				keytabKerberosOptions.put("storeKey", "true");
+			}
+
+			keytabKerberosOptions.put("principal", principal);
+			keytabKerberosOptions.put("refreshKrb5Config", "true");
+			keytabKerberosOptions.putAll(debugOptions);
+
+			keytabKerberosAce = new AppConfigurationEntry(
+					KerberosUtil.getKrb5LoginModuleName(),
+					AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+					keytabKerberosOptions);
+		}
+	}
+
+	public static Map<String, String> getKeytabKerberosOptions() {
+		return keytabKerberosOptions;
+	}
+
+	private static String prependFileUri(String keytabPath) {
+		File f = new File(keytabPath);
+		return f.toURI().toString();
+	}
+
+	@Override
+	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
+
+		LOG.debug("JAAS configuration requested for the application entry: {}", applicationName);
+
+		AppConfigurationEntry[] appConfigurationEntry;
+
+		if(keytabKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce};
+		} else {
+			appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce};
+		}
+
+		return appConfigurationEntry;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
new file mode 100644
index 0000000..4b8b69b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+
+/*
+ * Process-wide security context object which initializes UGI with appropriate security credentials and also it
+ * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
+ * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+ */
+@Internal
+public class SecurityContext {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class);
+
+	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
+
+	private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
+
+	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	private static SecurityContext installedContext;
+
+	public static SecurityContext getInstalled() { return installedContext; }
+
+	private UserGroupInformation ugi;
+
+	SecurityContext(UserGroupInformation ugi) {
+		if(ugi == null) {
+			throw new RuntimeException("UGI passed cannot be null");
+		}
+		this.ugi = ugi;
+	}
+
+	public <T> T runSecured(final FlinkSecuredRunner<T> runner) throws Exception {
+		return ugi.doAs(new PrivilegedExceptionAction<T>() {
+			@Override
+			public T run() throws Exception {
+				return runner.run();
+			}
+		});
+	}
+
+	public static void install(SecurityConfiguration config) throws Exception {
+
+		// perform static initialization of UGI, JAAS
+		if(installedContext != null) {
+			LOG.warn("overriding previous security context");
+		}
+
+		// establish the JAAS config
+		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
+		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+
+		populateSystemSecurityProperties(config.flinkConf);
+
+		// establish the UGI login user
+		UserGroupInformation.setConfiguration(config.hadoopConf);
+
+		UserGroupInformation loginUser;
+
+		if(UserGroupInformation.isSecurityEnabled() &&
+				config.keytab != null && !StringUtils.isBlank(config.principal)) {
+			String keytabPath = (new File(config.keytab)).getAbsolutePath();
+
+			UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
+
+			loginUser = UserGroupInformation.getLoginUser();
+
+			// supplement with any available tokens
+			String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+			if(fileLocation != null) {
+				/*
+				 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+				 * used in the context of reading the stored tokens from UGI.
+				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+				 * loginUser.addCredentials(cred);
+				*/
+				try {
+					Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+							File.class, org.apache.hadoop.conf.Configuration.class);
+					Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null,new File(fileLocation),
+							config.hadoopConf);
+					Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+							Credentials.class);
+					addCredentialsMethod.invoke(loginUser,cred);
+				} catch(NoSuchMethodException e) {
+					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+				}
+			}
+		} else {
+			// login with current user credentials (e.g. ticket cache)
+			try {
+				//Use reflection API to get the login user object
+				//UserGroupInformation.loginUserFromSubject(null);
+				Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+				Subject subject = null;
+				loginUserFromSubjectMethod.invoke(null,subject);
+			} catch(NoSuchMethodException e) {
+				LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+			}
+
+			loginUser = UserGroupInformation.getLoginUser();
+			// note that the stored tokens are read automatically
+		}
+
+		boolean delegationToken = false;
+		final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for(Token<? extends TokenIdentifier> token : usrTok) {
+			final Text id = new Text(token.getIdentifier());
+			LOG.debug("Found user token " + id + " with " + token);
+			if(token.getKind().equals(HDFS_DELEGATION_KIND)) {
+				delegationToken = true;
+			}
+		}
+
+		if(UserGroupInformation.isSecurityEnabled() && !loginUser.hasKerberosCredentials()) {
+			//throw an error in non-yarn deployment if kerberos cache is not available
+			if(!delegationToken) {
+				LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+				throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+			}
+		}
+
+		installedContext = new SecurityContext(loginUser);
+	}
+
+	/*
+	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
+	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+	 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
+	 * Kafka current code behavior.
+	 */
+	private static void populateSystemSecurityProperties(Configuration configuration) {
+
+		//required to be empty for Kafka but we will override the property
+		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+
+		if(configuration == null) {
+			return;
+		}
+
+		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
+				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
+		if(disableSaslClient) {
+			LOG.info("SASL client auth for ZK will be disabled");
+			//SASL auth is disabled by default but will be enabled if specified in configuration
+			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
+			return;
+		}
+
+		String baseDir = configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
+		if(baseDir == null) {
+			String message = "SASL auth is enabled for ZK but unable to locate pseudo Jaas config " +
+					"since " + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
+			LOG.error(message);
+			throw new IllegalConfigurationException(message);
+		}
+
+		File f = new File(baseDir);
+		if(!f.exists() || !f.isDirectory()) {
+			LOG.error("Invalid flink base directory {} configuration provided", baseDir);
+			throw new IllegalConfigurationException("Invalid flink base directory configuration provided");
+		}
+
+		File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
+
+		if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
+
+			//check if there is a conf directory
+			File confDir = new File(f, "conf");
+			if(!confDir.exists() || !confDir.isDirectory()) {
+				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
+				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
+			}
+
+			jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
+
+			if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
+				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
+				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
+			}
+		}
+
+		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
+				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile);
+
+		//ZK client module lookup the configuration to handle SASL.
+		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile.getAbsolutePath());
+		System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+
+		String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
+		if(!StringUtils.isBlank(zkSaslServiceName)) {
+			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
+			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+		}
+
+	}
+
+	/**
+	 * Inputs for establishing the security context.
+	 */
+	public static class SecurityConfiguration {
+
+		Configuration flinkConf;
+
+		org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+
+		String keytab;
+
+		String principal;
+
+		public String getKeytab() {
+			return keytab;
+		}
+
+		public String getPrincipal() {
+			return principal;
+		}
+
+		public SecurityConfiguration setFlinkConfiguration(Configuration flinkConf) {
+
+			this.flinkConf = flinkConf;
+
+			String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+
+			String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+
+			validate(keytab, principal);
+
+			LOG.debug("keytab {} and principal {} .", keytab, principal);
+
+			this.keytab = keytab;
+
+			this.principal = principal;
+
+			return this;
+		}
+
+		public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
+			this.hadoopConf = conf;
+			return this;
+		}
+
+		private void validate(String keytab, String principal) {
+
+			if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
+					!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
+				if(StringUtils.isBlank(keytab)) {
+					LOG.warn("Keytab is null or empty");
+				}
+				if(StringUtils.isBlank(principal)) {
+					LOG.warn("Principal is null or empty");
+				}
+				throw new RuntimeException("Requires both keytab and principal to be provided");
+			}
+
+			if(!StringUtils.isBlank(keytab)) {
+				File keytabFile = new File(keytab);
+				if(!keytabFile.exists() || !keytabFile.isFile()) {
+					LOG.warn("Not a valid keytab: {} file", keytab);
+					throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+				}
+			}
+
+		}
+	}
+
+	public interface FlinkSecuredRunner<T> {
+		T run() throws Exception;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9c844ba..639c158 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -72,8 +72,8 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
+import org.apache.flink.runtime.security.{SecurityContext}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -2062,26 +2062,18 @@ object JobManager {
     }
 
     // run the job manager
+    SecurityContext.install(new SecurityConfiguration().setFlinkConfiguration(configuration))
+
     try {
-      if (SecurityUtils.isSecurityEnabled) {
-        LOG.info("Security is enabled. Starting secure JobManager.")
-        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-          override def run(): Unit = {
-            runJobManager(
-              configuration,
-              executionMode,
-              listeningHost,
-              listeningPortRange)
-          }
-        })
-      } else {
-        LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
-        runJobManager(
-          configuration,
-          executionMode,
-          listeningHost,
-          listeningPortRange)
-      }
+      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
+        override def run(): Unit = {
+          runJobManager(
+            configuration,
+            executionMode,
+            listeningHost,
+            listeningPortRange)
+        }
+      })
     } catch {
       case t: Throwable =>
         LOG.error("Failed to run JobManager.", t)

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 63a64a0..8534ee1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -71,8 +71,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
 import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
+import org.apache.flink.runtime.security.SecurityContext
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{MathUtils, NetUtils}
@@ -1521,19 +1521,14 @@ object TaskManager {
     val resourceId = ResourceID.generate()
 
     // run the TaskManager (if requested in an authentication enabled context)
+    SecurityContext.install(new SecurityConfiguration().setFlinkConfiguration(configuration))
+
     try {
-      if (SecurityUtils.isSecurityEnabled) {
-        LOG.info("Security is enabled. Starting secure TaskManager.")
-        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-          override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
-          }
-        })
-      }
-      else {
-        LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
-      }
+      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
+        override def run(): Unit = {
+          selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
+        }
+      })
     }
     catch {
       case t: Throwable =>
@@ -1588,6 +1583,8 @@ object TaskManager {
       }
     }
 
+    conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, cliConfig.getConfigDir() + "/..")
+
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
new file mode 100644
index 0000000..89e5ef9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JaasConfiguration}.
+ */
+public class JaasConfigurationTest {
+
+	@Test
+	public void testInvalidKerberosParams() {
+		String keytab = "user.keytab";
+		String principal = null;
+		try {
+			new JaasConfiguration(keytab, principal);
+		} catch(RuntimeException re) {
+			assertEquals("Both keytab and principal are required and cannot be empty",re.getMessage());
+		}
+	}
+
+	@Test
+	public void testDefaultAceEntry() {
+		JaasConfiguration conf = new JaasConfiguration(null,null);
+		javax.security.auth.login.Configuration.setConfiguration(conf);
+		final AppConfigurationEntry[] entry = conf.getAppConfigurationEntry("test");
+		AppConfigurationEntry ace = entry[0];
+		assertEquals(ace.getLoginModuleName(), KerberosUtil.getKrb5LoginModuleName());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
new file mode 100644
index 0000000..5f3d76a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SecurityContext}.
+ */
+public class SecurityContextTest {
+
+	@Test
+	public void testCreateInsecureHadoopCtx() {
+		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+		try {
+			SecurityContext.install(sc);
+			assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testInvalidUGIContext() {
+		try {
+			new SecurityContext(null);
+		} catch (RuntimeException re) {
+			assertEquals("UGI passed cannot be null",re.getMessage());
+		}
+	}
+
+
+	private String getOSUserName() throws Exception {
+		String userName = "";
+		String osName = System.getProperty( "os.name" ).toLowerCase();
+		String className = null;
+
+		if( osName.contains( "windows" ) ){
+			className = "com.sun.security.auth.module.NTSystem";
+		}
+		else if( osName.contains( "linux" ) ){
+			className = "com.sun.security.auth.module.UnixSystem";
+		}
+		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
+			className = "com.sun.security.auth.module.SolarisSystem";
+		}
+
+		if( className != null ){
+			Class<?> c = Class.forName( className );
+			Method method = c.getDeclaredMethod( "getUsername" );
+			Object o = c.newInstance();
+			userName = (String) method.invoke( o );
+		}
+		return userName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
index 5712856..edf299d 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -121,6 +121,42 @@ under the License.
 			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+		</dependency>
+
 	</dependencies>
 
+	<build>
+		<plugins>
+
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<!--
+					Enforce single threaded execution to avoid port conflicts when running
+					secure mini DFS cluster
+					-->
+					<forkCount>1</forkCount>
+					<reuseForks>false</reuseForks>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 7ee75c1..c3c8df5 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,6 +58,8 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -79,19 +82,24 @@ import java.util.Map;
 @Deprecated
 public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
+	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class);
+
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
+	protected static MiniDFSCluster hdfsCluster;
+	protected static org.apache.hadoop.fs.FileSystem dfs;
+	protected static String hdfsURI;
+	protected static Configuration conf = new Configuration();
 
+	protected static File dataDir;
 
 	@BeforeClass
 	public static void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
 
-		File dataDir = tempFolder.newFolder();
+		LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
+
+		dataDir = tempFolder.newFolder();
 
 		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
 		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
@@ -106,6 +114,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 	@AfterClass
 	public static void destroyHDFS() {
+		LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
 		hdfsCluster.shutdown();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
new file mode 100644
index 0000000..86cedaf
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+
+/**
+ * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
+ */
+
+//The test is disabled since MiniDFS secure run requires lower order ports to be used.
+//We can enable the test when the fix is available (HDFS-9213)
+@Ignore
+public class RollingSinkSecuredITCase extends RollingSinkITCase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
+
+	/*
+	 * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
+	 * and out-of-order sequence for secure cluster
+	 */
+	@BeforeClass
+	public static void setup() throws Exception {}
+
+	@AfterClass
+	public static void teardown() throws Exception {}
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {}
+
+	@AfterClass
+	public static void destroyHDFS() {}
+
+	@BeforeClass
+	public static void startSecureCluster() throws Exception {
+
+		LOG.info("starting secure cluster environment for testing");
+
+		dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+
+		SecureTestEnvironment.prepare(tempFolder);
+
+		populateSecureConfigurations();
+
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+				SecureTestEnvironment.getTestKeytab());
+		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+				SecureTestEnvironment.getHadoopServicePrincipal());
+
+		SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+		ctx.setFlinkConfiguration(flinkConfig);
+		ctx.setHadoopConfiguration(conf);
+		try {
+			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+		} catch (Exception e) {
+			throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
+		}
+
+		File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml");
+
+		FileWriter writer = new FileWriter(hdfsSiteXML);
+		conf.writeXml(writer);
+		writer.flush();
+		writer.close();
+
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
+		TestBaseUtils.setEnv(map);
+
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		builder.checkDataNodeAddrConfig(true);
+		builder.checkDataNodeHostConfig(true);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
+
+		startSecureFlinkClusterWithRecoveryModeEnabled();
+	}
+
+	@AfterClass
+	public static void teardownSecureCluster() throws Exception {
+		LOG.info("tearing down secure cluster environment");
+
+		TestStreamEnvironment.unsetAsContext();
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+
+		hdfsCluster.shutdown();
+
+		SecureTestEnvironment.cleanup();
+	}
+
+	private static void populateSecureConfigurations() {
+
+		String dataTransferProtection = "authentication";
+
+		SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+		conf.set(DFS_NAMENODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+		conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
+		conf.set(DFS_DATANODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+		conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
+		conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+
+		conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+		conf.set("dfs.data.transfer.protection", dataTransferProtection);
+
+		conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name());
+
+		conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
+
+		conf.setInt("dfs.datanode.socket.write.timeout", 0);
+
+		/*
+		 * We ae setting the port number to privileged port - see HDFS-9213
+		 * This requires the user to have root privilege to bind to the port
+		 * Use below command (ubuntu) to set privilege to java process for the
+		 * bind() to work if the java process is not running as root.
+		 * setcap 'cap_net_bind_service=+ep' /path/to/java
+		 */
+		conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
+		conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
+		conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
+	}
+
+	private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+		try {
+			LOG.info("Starting Flink and ZK in secure mode");
+
+			dfs.mkdirs(new Path("/flink/checkpoints"));
+			dfs.mkdirs(new Path("/flink/recovery"));
+
+			org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
+
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + "/flink/recovery");
+			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
+
+			SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+
+			cluster = TestBaseUtils.startCluster(config, false);
+			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
+
+		} catch (Exception e) {
+			LOG.error("Exception occured while creating MiniFlink cluster. Reason: {}", e);
+			throw new RuntimeException(e);
+		}
+	}
+
+	/* For secure cluster testing, it is enough to run only one test and override below test methods
+	 * to keep the overall build time minimal
+	 */
+	@Override
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {}
+
+	@Override
+	public void testDateTimeRollingStringWriter() throws Exception {}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
index fe60d94..5c22851 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -25,3 +25,5 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index fc13719..5c951db 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
-
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 864773a..cbf3d06 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -81,6 +81,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public Properties getSecureProperties() {
+		return null;
+	}
+
+	@Override
 	public String getVersion() {
 		return "0.8";
 	}
@@ -132,9 +137,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		return server.socketServer().brokerId();
 	}
 
+	@Override
+	public boolean isSecureRunSupported() {
+		return false;
+	}
+
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
 		this.additionalServerProperties = additionalServerProperties;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
@@ -210,6 +220,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		if (zookeeper != null) {
 			try {
 				zookeeper.stop();
+				zookeeper.close();
 			}
 			catch (Exception e) {
 				LOG.warn("ZK.stop() failed", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index 3b31de6..bfcde82 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -134,6 +134,13 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>
@@ -180,6 +187,17 @@ under the License.
 					<argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
 				</configuration>
 			</plugin>
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
 		</plugins>
 	</build>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 957833d..16ddcdc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -21,12 +21,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.junit.Test;
 
+import java.util.Properties;
 import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
 	// ------------------------------------------------------------------------
@@ -131,11 +131,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	public void testJsonTableSource() throws Exception {
 		String topic = UUID.randomUUID().toString();
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
 		// Names and types are determined in the actual test method of the
 		// base test class.
 		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
 				topic,
-				standardProps,
+				props,
 				new String[] {
 						"long",
 						"string",
@@ -159,11 +163,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
 		String topic = UUID.randomUUID().toString();
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
 		// Names and types are determined in the actual test method of the
 		// base test class.
 		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
 				topic,
-				standardProps,
+				props,
 				new String[] {
 						"long",
 						"string",

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index 1288347..ae4f5b2 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
-
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
new file mode 100644
index 0000000..d12ec65
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
+
+	@BeforeClass
+	public static void prepare() throws IOException, ClassNotFoundException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting Kafka09SecureRunITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		SecureTestEnvironment.prepare(tempFolder);
+		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+		startClusters(true);
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		shutdownClusters();
+		SecureTestEnvironment.cleanup();
+	}
+
+
+	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
+	//The timeout for the test case is 2 times timeout of ZK connection
+	@Test(timeout = 600000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+}
\ No newline at end of file


[3/5] flink git commit: [FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index f4c2032..848013c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -341,26 +341,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	@Override
 	public YarnClusterClient deploy() {
-
 		try {
-
-			UserGroupInformation.setConfiguration(conf);
-			UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-			if (UserGroupInformation.isSecurityEnabled()) {
-				if (!ugi.hasKerberosCredentials()) {
-					throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
-						"You may use kinit to authenticate and request a TGT from the Kerberos server.");
-				}
-				return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
-					@Override
-					public YarnClusterClient run() throws Exception {
-						return deployInternal();
-					}
-				});
-			} else {
-				return deployInternal();
-			}
+			return deployInternal();
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't deploy Yarn cluster", e);
 		}
@@ -539,9 +521,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		addLibFolderToShipFiles(effectiveShipFiles);
+		//check if there is a JAAS config file
+		File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityContext.JAAS_CONF_FILENAME);
+		if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
+			effectiveShipFiles.add(jaasConfigFile);
+		}
 
-		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j);
+		addLibFolderToShipFiles(effectiveShipFiles);
 
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
@@ -626,8 +612,53 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 		fs.setPermission(sessionFilesDir, permission); // set permission for path.
 
+		//To support Yarn Secure Integration Test Scenario
+		//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
+		//and KRB5 configuration files. We are adding these files as container local resources for the container
+		//applications (JM/TMs) to have proper secure cluster setup
+		Path remoteKrb5Path = null;
+		Path remoteYarnSiteXmlPath = null;
+		boolean hasKrb5 = false;
+		if(System.getenv("IN_TESTS") != null) {
+			String krb5Config = System.getProperty("java.security.krb5.conf");
+			if(krb5Config != null && krb5Config.length() != 0) {
+				File krb5 = new File(krb5Config);
+				LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
+				LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
+				Path krb5ConfPath = new Path(krb5.getAbsolutePath());
+				remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
+				localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+
+				File f = new File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME);
+				LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
+				LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
+				Path yarnSitePath = new Path(f.getAbsolutePath());
+				remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
+				localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+
+				hasKrb5 = true;
+			}
+		}
+
 		// setup security tokens
-		Utils.setTokensFor(amContainer, paths, conf);
+		LocalResource keytabResource = null;
+		Path remotePathKeytab = null;
+		String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+		if(keytab != null) {
+			LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(keytab);
+			remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
+			localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+		}
+
+		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
+
+		if ( UserGroupInformation.isSecurityEnabled() && keytab == null ) {
+			//set tokens only when keytab is not provided
+			LOG.info("Adding delegation token to the AM container..");
+			Utils.setTokensFor(amContainer, paths, conf);
+		}
 
 		amContainer.setLocalResources(localResources);
 		fs.close();
@@ -646,11 +677,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
-		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
 		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
 		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
 		appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
 
+		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
+		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(keytabResource != null) {
+			appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
+			String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+			appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
+			appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString() );
+		}
+
 		if(dynamicPropertiesEncoded != null) {
 			appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
 		}
@@ -700,6 +745,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
 			}
 			YarnApplicationState appState = report.getYarnApplicationState();
+			LOG.debug("Application State: {}", appState);
 			switch(appState) {
 				case FAILED:
 				case FINISHED:
@@ -996,7 +1042,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
-	protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) {
+	protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback,
+																	boolean hasLog4j,
+																	boolean hasKrb5) {
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
@@ -1021,6 +1069,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
+		//applicable only for YarnMiniCluster secure test run
+		//krb5.conf file will be available as local resource in JM/TM container
+		if(hasKrb5) {
+			amCommand += " -Djava.security.krb5.conf=krb5.conf";
+		}
+
 		amCommand += " " + getApplicationMasterClass().getName() + " "
 			+ " 1>"
 			+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 1496d61..94d4582 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -60,6 +60,14 @@ public final class Utils {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
+	/** Keytab file name populated in YARN container */
+	public static final String KEYTAB_FILE_NAME = "krb5.keytab";
+
+	/** KRB5 file name populated in YARN container for secure IT run */
+	public static final String KRB5_FILE_NAME = "krb5.conf";
+
+	/** Yarn site xml file name populated in YARN container for secure IT run */
+	public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
 	/**
 	 * See documentation

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 6619633..efb658a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -40,11 +41,11 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -60,11 +61,10 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.UUID;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -117,7 +117,7 @@ public class YarnApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the YARN application master. Obtains user group
-	 * information and calls the main work method {@link #runApplicationMaster()} as a
+	 * information and calls the main work method {@link #runApplicationMaster(Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
@@ -127,34 +127,66 @@ public class YarnApplicationMasterRunner {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
 			require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
-				YarnConfigKeys.ENV_CLIENT_USERNAME);
+				YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
-			final UserGroupInformation currentUser;
-			try {
-				currentUser = UserGroupInformation.getCurrentUser();
-			} catch (Throwable t) {
-				throw new Exception("Cannot access UserGroupInformation information for current user", t);
+			final String currDir = ENV.get(Environment.PWD.key());
+			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+			LOG.debug("Current working Directory: {}", currDir);
+
+			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+			LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath);
+
+			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+			LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+			String keytabPath = null;
+			if(remoteKeytabPath != null) {
+				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+				keytabPath = f.getAbsolutePath();
+				LOG.debug("keytabPath: {}", keytabPath);
 			}
 
-			LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}",
-				currentUser.getShortUserName(), yarnClientUsername);
+			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+					currentUser.getShortUserName(), yarnClientUsername );
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if(krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+				sc.setHadoopConfiguration(conf);
+			}
 
-			UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+			// Flink configuration
+			final Map<String, String> dynamicProperties =
+					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
 
-			// transfer all security tokens, for example for authenticated HDFS and HBase access
-			for (Token<?> token : currentUser.getTokens()) {
-				ugi.addToken(token);
+			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
+			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
-			// run the actual work in a secured privileged action
-			return ugi.doAs(new PrivilegedAction<Integer>() {
+			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
 				@Override
 				public Integer run() {
-					return runApplicationMaster();
+					return runApplicationMaster(flinkConfig);
 				}
 			});
+
 		}
 		catch (Throwable t) {
 			// make sure that everything whatever ends up in the log
@@ -172,7 +204,7 @@ public class YarnApplicationMasterRunner {
 	 *
 	 * @return The return code for the Java process.
 	 */
-	protected int runApplicationMaster() {
+	protected int runApplicationMaster(Configuration config) {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 
@@ -194,12 +226,21 @@ public class YarnApplicationMasterRunner {
 
 			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
 
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-				FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+			//Update keytab and principal path to reflect YARN container path location
+			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
 
-			final Configuration config = createConfiguration(currDir, dynamicProperties);
+			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+
+			String keytabPath = null;
+			if(remoteKeytabPath != null) {
+				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+				keytabPath = f.getAbsolutePath();
+				LOG.info("keytabPath: {}", keytabPath);
+			}
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+			}
 
 			// Hadoop/Yarn configuration (loads config data automatically from classpath files)
 			final YarnConfiguration yarnConfig = new YarnConfiguration();
@@ -523,8 +564,20 @@ public class YarnApplicationMasterRunner {
 		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
 		require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
 
-		String yarnClientUsername = env.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
-		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
+		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+		LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+		LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath);
+
+		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+		LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
 
 		String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
 		require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
@@ -537,6 +590,33 @@ public class YarnApplicationMasterRunner {
 			throw new Exception("Could not access YARN's default file system", e);
 		}
 
+		//register keytab
+		LocalResource keytabResource = null;
+		if(remoteKeytabPath != null) {
+			LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(remoteKeytabPath);
+			Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		LocalResource yarnConfResource = null;
+		LocalResource krb5ConfResource = null;
+		boolean hasKrb5 = false;
+		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+			LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+			yarnConfResource = Records.newRecord(LocalResource.class);
+			Path yarnConfPath = new Path(remoteYarnConfPath);
+			Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
+
+			LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+			krb5ConfResource = Records.newRecord(LocalResource.class);
+			Path krb5ConfPath = new Path(remoteKrb5Path);
+			Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
+
+			hasKrb5 = true;
+		}
+
 		// register Flink Jar with remote HDFS
 		LocalResource flinkJar = Records.newRecord(LocalResource.class);
 		{
@@ -563,6 +643,16 @@ public class YarnApplicationMasterRunner {
 		taskManagerLocalResources.put("flink.jar", flinkJar);
 		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
 
+		//To support Yarn Secure Integration Test Scenario
+		if(yarnConfResource != null && krb5ConfResource != null) {
+			taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+			taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+		}
+
+		if(keytabResource != null) {
+			taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+		}
+
 		// prepare additional files to be shipped
 		for (String pathStr : shipListString.split(",")) {
 			if (!pathStr.isEmpty()) {
@@ -582,7 +672,7 @@ public class YarnApplicationMasterRunner {
 
 		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
 			flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-			hasLogback, hasLog4j, taskManagerMainClass);
+			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
 
 		log.info("Starting TaskManagers with command: " + launchCommand);
 
@@ -597,11 +687,17 @@ public class YarnApplicationMasterRunner {
 		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
 		Utils.setupYarnClassPath(yarnConfig, containerEnv);
 
-		containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername);
+		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+		}
 
 		ctx.setEnvironment(containerEnv);
 
 		try (DataOutputBuffer dob = new DataOutputBuffer()) {
+			LOG.debug("Adding security tokens to Task Manager Container launch Context....");
 			UserGroupInformation user = UserGroupInformation.getCurrentUser();
 			Credentials credentials = user.getCredentials();
 			credentials.writeTokenStorageToStream(dob);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index b14d7b7..ada241c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -32,7 +32,6 @@ public class YarnConfigKeys {
 	public final static String ENV_APP_ID = "_APP_ID";
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
 	public static final String ENV_SLOTS = "_SLOTS";
 	public static final String ENV_DETACHED = "_DETACHED";
 	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
@@ -41,8 +40,13 @@ public class YarnConfigKeys {
 
 	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
 
+	public final static String KEYTAB_PATH = "_KEYTAB_PATH";
+	public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+	public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
 	public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";
 
+	public static final String ENV_KRB5_PATH = "_KRB5_PATH";
+	public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH";
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 9638137..c70a30b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.yarn;
 
+import java.io.File;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 
 import org.slf4j.Logger;
@@ -64,8 +64,18 @@ public class YarnTaskManagerRunner {
 
 		// read the environment variables for YARN
 		final Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
 		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+		LOG.info("Current working/local Directory: {}", localDirs);
+
+		final String currDir = envs.get(Environment.PWD.key());
+		LOG.info("Current working Directory: {}", currDir);
+
+		final String remoteKeytabPath = envs.get(YarnConfigKeys.KEYTAB_PATH);
+		LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
 		// configure local directory
 		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
@@ -78,34 +88,66 @@ public class YarnTaskManagerRunner {
 				"specified in the Flink config: " + flinkTempDirs);
 		}
 
-		LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +
-			"' setting user to execute Flink TaskManager to '" + yarnClientUsername + "'");
-
 		// tell akka to die in case of an error
 		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
 
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
+		String keytabPath = null;
+		if(remoteKeytabPath != null) {
+			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+			keytabPath = f.getAbsolutePath();
+			LOG.info("keytabPath: {}", keytabPath);
 		}
 
+		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+				currentUser.getShortUserName(), yarnClientUsername );
+
 		// Infer the resource identifier from the environment variable
 		String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
-				}
-				catch (Throwable t) {
-					LOG.error("Error while starting the TaskManager", t);
-					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-				}
-				return null;
+		try {
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if(krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+				sc.setHadoopConfiguration(conf);
+			}
+
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
-		});
+			configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
+
+			SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					try {
+						TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+					}
+					catch (Throwable t) {
+						LOG.error("Error while starting the TaskManager", t);
+						System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+					}
+					return null;
+				}
+			});
+		} catch(Exception e) {
+			LOG.error("Exception occurred while launching Task Manager. Reason: {}", e);
+			throw new RuntimeException(e);
+		}
+
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 8f02a1c..b5364f0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,11 +24,14 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -460,9 +463,27 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		}
 	}
 
-	public static void main(String[] args) {
-		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
-		System.exit(cli.run(args));
+	public static void main(final String[] args) {
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
+
+		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
+		GlobalConfiguration.loadConfiguration(confDirPath);
+		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+		flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath);
+		try {
+			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+			int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					return cli.run(args);
+				}
+			});
+			System.exit(retCode);
+		} catch(Exception e) {
+			e.printStackTrace();
+			LOG.error("Exception Occured. Reason: {}", e);
+			return;
+		}
 	}
 
 	@Override
@@ -523,6 +544,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		try {
 			return yarnClusterDescriptor.deploy();
 		} catch (Exception e) {
+			LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e);
 			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 02e868e..5b3148a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,13 @@ under the License.
 		<jackson.version>2.7.4</jackson.version>
 		<metrics.version>3.1.0</metrics.version>
 		<junit.version>4.11</junit.version>
+		<!--
+			Keeping the MiniKDC version fixed instead of taking hadoop version dependency
+			to support testing Kafka, ZK etc., modules that does not have Hadoop dependency
+			Starting Hadoop 3, org.apache.kerby will be used instead of MiniKDC. We may have
+			to revisit the impact at that time.
+		-->
+		<minikdc.version>2.7.2</minikdc.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 53379b4..476cee3 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -45,3 +45,6 @@ log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
 # the tests
 log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
 log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka=INFO, console
+log4j.logger.org.I0Itec.zkclient=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread=OFF
\ No newline at end of file


[4/5] flink git commit: [FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0dbe865..213ba4a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -65,6 +65,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String brokerConnectionString = "";
 	private Properties standardProps;
 	private Properties additionalServerProperties;
+	private boolean secureMode = false;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private String zkTimeout = "30000";
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
@@ -131,8 +134,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if(secureMode) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			numKafkaServers = 1;
+			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
+		}
+
 		this.additionalServerProperties = additionalServerProperties;
+		this.secureMode = secureMode;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -155,6 +172,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("Starting Zookeeper");
 			zookeeper = new TestingServer(-1, tmpZkDir);
 			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
 
 			LOG.info("Starting KafkaServer");
 			brokers = new ArrayList<>(numKafkaServers);
@@ -163,7 +181,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				SocketServer socketServer = brokers.get(i).socketServer();
-				brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				if(secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				} else {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				}
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -173,15 +195,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			fail("Test setup failed: " + t.getMessage());
 		}
 
+		LOG.info("brokerConnectionString --> {}", brokerConnectionString);
+
 		standardProps = new Properties();
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
 		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
-		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
+		standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+		standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
 		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
 	}
 
 	@Override
@@ -196,6 +221,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		if (zookeeper != null) {
 			try {
 				zookeeper.stop();
+				zookeeper.close();
 			}
 			catch (Exception e) {
 				LOG.warn("ZK.stop() failed", e);
@@ -224,6 +250,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	public ZkUtils getZkUtils() {
+		LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
 		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
 				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
 		return ZkUtils.apply(creator, false);
@@ -241,23 +268,37 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			zkUtils.close();
 		}
 
+		LOG.info("Topic {} create request is successfully posted", topic);
+
 		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
+		final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
 		do {
 			try {
-				Thread.sleep(100);
+				if(secureMode) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = Integer.parseInt(zkTimeout) / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+
 			} catch (InterruptedException e) {
 				// restore interrupted state
 			}
 			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
 			// not always correct.
 
+			LOG.info("Validating if the topic {} has been created or not", topic);
+
 			// create a new ZK utils connection
 			ZkUtils checkZKConn = getZkUtils();
 			if(AdminUtils.topicExists(checkZKConn, topic)) {
+				LOG.info("topic {} has been created successfully", topic);
 				checkZKConn.close();
 				return;
 			}
+			LOG.info("topic {} has not been created yet. Will check again...", topic);
 			checkZKConn.close();
 		}
 		while (System.currentTimeMillis() < deadline);
@@ -296,8 +337,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
 
 		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
-		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
 		if(additionalServerProperties != null) {
 			kafkaProperties.putAll(additionalServerProperties);
 		}
@@ -307,6 +348,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		for (int i = 1; i <= numTries; i++) {
 			int kafkaPort = NetUtils.getAvailablePort();
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			//to support secure kafka cluster
+			if(secureMode) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
 			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
 			try {
@@ -329,4 +379,19 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
 	}
 
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if(secureMode) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+			prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+			prop.setProperty("metadata.fetch.timeout.ms","120000");
+		}
+		return prop;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index fbeb110..4ac1773 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -28,3 +28,5 @@ log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index 49d630f..ef71bde 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -161,6 +161,14 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<dependencyManagement>
@@ -187,6 +195,17 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
 		</plugins>
 	</build>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 920f15b..a87ff8a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -181,6 +181,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			properties.setProperty("session.timeout.ms", "2000");
 			properties.setProperty("fetch.max.wait.ms", "2000");
 			properties.setProperty("heartbeat.interval.ms", "1000");
+			properties.putAll(secureProps);
 			FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
 			DataStream<String> stream = see.addSource(source);
 			stream.print();
@@ -275,6 +276,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		});
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
+		producerProperties.putAll(secureProps);
 		FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
 		stream.addSink(prod);
 
@@ -283,7 +285,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		List<String> topics = new ArrayList<>();
 		topics.add(topic);
 		topics.add(additionalEmptyTopic);
-		FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, standardProps);
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props);
 
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
 
@@ -371,7 +377,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -416,7 +426,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -463,7 +476,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.getConfig().disableSysoutLogging();
 		env.setBufferTimeout(0);
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 			.addSource(kafkaSource)
@@ -506,7 +522,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.enableCheckpointing(100);
 					env.getConfig().disableSysoutLogging();
 
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+					Properties props = new Properties();
+					props.putAll(standardProps);
+					props.putAll(secureProps);
+					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
 
 					env.addSource(source).addSink(new DiscardingSink<String>());
 
@@ -577,7 +596,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.enableCheckpointing(100);
 					env.getConfig().disableSysoutLogging();
 
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+					Properties props = new Properties();
+					props.putAll(standardProps);
+					props.putAll(secureProps);
+					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
 
 					env.addSource(source).addSink(new DiscardingSink<String>());
 
@@ -629,7 +651,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setParallelism(12); // needs to be more that the mini cluster has slots
 		env.getConfig().disableSysoutLogging();
 
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -700,15 +725,19 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
 
-		stream.addSink(kafkaServer.getProducer("dummy", schema, standardProps, null));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		stream.addSink(kafkaServer.getProducer("dummy", schema, props, null));
 
 		env.execute("Write to topics");
 
 		// run second job consuming from multiple topics
 		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
-		
-		stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps));
+
+		stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
 
 		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
 			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
@@ -787,6 +816,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		// Produce serialized JSON data
 		createTestTopic(topic, 1, 1);
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
@@ -805,7 +838,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}).addSink(kafkaServer.getProducer(
 				topic,
 				new ByteArraySerializationSchema(),
-				standardProps,
+				props,
 				null));
 
 		// Execute blocks
@@ -940,6 +973,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
 		consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher
 		consumerProps.setProperty("queued.max.message.chunks", "1");
+		consumerProps.putAll(secureProps);
 
 		FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
 		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
@@ -969,6 +1003,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		Properties producerProps = new Properties();
 		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15));
 		producerProps.setProperty("retries", "3");
+		producerProps.putAll(secureProps);
 		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
 
 		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -1047,8 +1082,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 
-
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
 
 		env
 				.addSource(kafkaSource)
@@ -1097,6 +1134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
+		producerProperties.putAll(secureProps);
 		kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
 		env.execute("Write KV to Kafka");
 
@@ -1110,7 +1148,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
 
-		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
 		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
 			long counter = 0;
 			@Override
@@ -1178,6 +1219,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
+		producerProperties.putAll(secureProps);
+
 		kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
 
 		env.execute("Write deletes to Kafka");
@@ -1189,7 +1232,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 
-		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props));
 
 		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
 			long counter = 0;
@@ -1226,7 +1272,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env1.getConfig().disableSysoutLogging();
 
-		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
 		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
 			@Override
 			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
@@ -1262,8 +1312,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env1.getConfig().disableSysoutLogging();
 					env1.disableOperatorChaining(); // let the source read everything into the network buffers
 
+					Properties props = new Properties();
+					props.putAll(standardProps);
+					props.putAll(secureProps);
+
 					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props));
 					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 						@Override
 						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
@@ -1288,7 +1342,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 						}
 					});
 
-					fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+					fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), props, null));
 
 					env1.execute("Metrics test job");
 				} catch(Throwable t) {
@@ -1403,6 +1457,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
 				new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
 
+		cc.putAll(secureProps);
 		// create the consumer
 		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
 
@@ -1505,6 +1560,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// the producer must not produce duplicates
 			Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 			producerProperties.setProperty("retries", "0");
+			producerProperties.putAll(secureProps);
 			
 			stream.addSink(kafkaServer.getProducer(
 							topicName, serSchema, producerProperties,
@@ -1537,7 +1593,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			
 			Properties readProps = (Properties) standardProps.clone();
 			readProps.setProperty("group.id", "flink-tests-validator");
-			
+			readProps.putAll(secureProps);
 			FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
 
 			readEnv
@@ -1672,6 +1728,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
 		newProps.setProperty("auto.offset.reset", "smallest");
 		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
+		newProps.putAll(secureProps);
 
 		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
 		printTopic(topicName, printerConfig, deserializer, elements);
@@ -1893,8 +1950,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
 			new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
 		FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
-			.getConsumer(topics, sourceSchema, standardProps)
+			.getConsumer(topics, sourceSchema, props)
 			.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
 
 		DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 14e74f1..5bcf406 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.test.util.SuccessException;
 
 
 import java.io.Serializable;
+import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -102,17 +103,24 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 				}
 			})
 			.setParallelism(1);
+
+			Properties props = new Properties();
+			props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+			props.putAll(secureProps);
 			
 			// sink partitions into 
 			stream.addSink(kafkaServer.getProducer(topic,
 					new KeyedSerializationSchemaWrapper<>(serSchema),
-					FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings),
+					props,
 					new CustomPartitioner(parallelism)))
 			.setParallelism(parallelism);
 
 			// ------ consuming topology ---------
-			
-			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+
+			Properties consumerProps = new Properties();
+			consumerProps.putAll(standardProps);
+			consumerProps.putAll(secureProps);
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
 			
 			env.addSource(source).setParallelism(parallelism)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index c4949ff..9236e78 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -36,6 +36,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,29 +62,39 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	private static Properties standardProps;
 	private static LocalFlinkMiniCluster flink;
 
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	protected static Properties secureProps = new Properties();
+
 	@BeforeClass
 	public static void prepare() throws IOException, ClassNotFoundException {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaShortRetentionTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
 
+		Configuration flinkConfig = new Configuration();
+
 		// dynamically load the implementation for the test
 		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
 		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
+		if(kafkaServer.isSecureRunSupported()) {
+			secureProps = kafkaServer.getSecureProperties();
+		}
+
 		Properties specificProperties = new Properties();
 		specificProperties.setProperty("log.retention.hours", "0");
 		specificProperties.setProperty("log.retention.minutes", "0");
 		specificProperties.setProperty("log.retention.ms", "250");
 		specificProperties.setProperty("log.retention.check.interval.ms", "100");
-		kafkaServer.prepare(1, specificProperties);
+		kafkaServer.prepare(1, specificProperties, false);
 
 		standardProps = kafkaServer.getStandardProperties();
 
 		// start also a re-usable Flink mini cluster
-		Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -98,6 +110,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
 			flink.shutdown();
 		}
 		kafkaServer.shutdown();
+
+		secureProps.clear();
 	}
 
 	/**
@@ -151,12 +165,17 @@ public class KafkaShortRetentionTestBase implements Serializable {
 				running = false;
 			}
 		});
-		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), standardProps, null));
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
+		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null));
 
 		// ----------- add consumer dataflow ----------
 
 		NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
-		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props);
 
 		DataStreamSource<String> consuming = env.addSource(source);
 		consuming.addSink(new DiscardingSink<String>());
@@ -224,6 +243,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		Properties customProps = new Properties();
 		customProps.putAll(standardProps);
+		customProps.putAll(secureProps);
 		customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
 		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
 
@@ -255,6 +275,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		Properties customProps = new Properties();
 		customProps.putAll(standardProps);
+		customProps.putAll(secureProps);
 		customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
 		
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 771db17..afdd158 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,61 +75,90 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static KafkaTestEnvironment kafkaServer;
 
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	protected static Properties secureProps = new Properties();
+
 	// ------------------------------------------------------------------------
 	//  Setup and teardown of the mini clusters
 	// ------------------------------------------------------------------------
 	
 	@BeforeClass
 	public static void prepare() throws IOException, ClassNotFoundException {
+
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
-		
 
+		startClusters(false);
 
-		// dynamically load the implementation for the test
-		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
-		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+	}
 
-		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+	@AfterClass
+	public static void shutDownServices() {
 
-		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Shut down KafkaTestBase ");
+		LOG.info("-------------------------------------------------------------------------");
 
-		standardProps = kafkaServer.getStandardProperties();
-		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+		shutdownClusters();
 
-		// start also a re-usable Flink mini cluster
-		Configuration flinkConfig = new Configuration();
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    KafkaTestBase finished");
+		LOG.info("-------------------------------------------------------------------------");
+	}
+
+	protected static Configuration getFlinkConfiguration() {
+		Configuration flinkConfig = new Configuration();;
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+		return flinkConfig;
+	}
+
+	protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
+
+		// dynamically load the implementation for the test
+		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+
+		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+
+		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+
+		standardProps = kafkaServer.getStandardProperties();
 
-		flink = new LocalFlinkMiniCluster(flinkConfig, false);
+		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+
+		if(kafkaServer.isSecureRunSupported() && secureMode) {
+			secureProps = kafkaServer.getSecureProperties();
+		}
+
+		// start also a re-usable Flink mini cluster
+		flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
 		flink.start();
 
 		flinkPort = flink.getLeaderRPCPort();
-	}
 
-	@AfterClass
-	public static void shutDownServices() {
+	}
 
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Shut down KafkaTestBase ");
-		LOG.info("-------------------------------------------------------------------------");
+	protected static void shutdownClusters() {
 
 		flinkPort = -1;
 		if (flink != null) {
 			flink.shutdown();
 		}
 
+		if(secureProps != null) {
+			secureProps.clear();
+		}
+
 		kafkaServer.shutdown();
 
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    KafkaTestBase finished");
-		LOG.info("-------------------------------------------------------------------------");
 	}
 
 
@@ -164,4 +195,5 @@ public abstract class KafkaTestBase extends TestLogger {
 	protected static void deleteTestTopic(String topic) {
 		kafkaServer.deleteTestTopic(topic);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 0b1d51d..6ecde71 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -35,10 +35,10 @@ public abstract class KafkaTestEnvironment {
 
 	protected static final String KAFKA_HOST = "localhost";
 
-	public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties);
+	public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
 
-	public void prepare(int numberOfKafkaServers) {
-		this.prepare(numberOfKafkaServers, null);
+	public void prepare(int numberOfKafkaServers, boolean secureMode) {
+		this.prepare(numberOfKafkaServers, null, secureMode);
 	}
 
 	public abstract void shutdown();
@@ -51,9 +51,10 @@ public abstract class KafkaTestEnvironment {
 		this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties());
 	}
 
-
 	public abstract Properties getStandardProperties();
 
+	public abstract Properties getSecureProperties();
+
 	public abstract String getBrokerConnectionString();
 
 	public abstract String getVersion();
@@ -86,4 +87,6 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract int getBrokerId(KafkaServer server);
 
+	public abstract boolean isSecureRunSupported();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 5a38e56..58a5cc3 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -135,11 +135,18 @@ public class DataGenerators {
 					}
 				});
 
+		Properties props = new Properties();
+		props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
+		Properties secureProps = testServer.getSecureProperties();
+		if(secureProps != null) {
+			props.putAll(testServer.getSecureProperties());
+		}
+
 		stream
 				.rebalance()
 				.addSink(testServer.getProducer(topic,
 						new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
-						FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
+						props,
 						new KafkaPartitioner<Integer>() {
 							@Override
 							public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 18ecfde..5c99ef6 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -78,5 +78,30 @@ under the License.
 			<scope>compile</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+		</dependency>
+
 	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a478908..6ec6c2c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -67,18 +69,22 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
 		super(new Configuration());
 	}
 
+	protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
+
 	// ------------------------------------------------------------------------
 	//  Cluster setup & teardown
 	// ------------------------------------------------------------------------
 
 	@BeforeClass
 	public static void setup() throws Exception {
+		LOG.info("In StreamingMultipleProgramsTestBase: Starting FlinkMiniCluster ");
 		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true);
 		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
 	}
 
 	@AfterClass
 	public static void teardown() throws Exception {
+		LOG.info("In StreamingMultipleProgramsTestBase: Closing FlinkMiniCluster ");
 		TestStreamEnvironment.unsetAsContext();
 		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
new file mode 100644
index 0000000..00b19f1
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper {@link SecureTestEnvironment} to handle MiniKDC lifecycle.
+ * This class can be used to start/stop MiniKDC and create secure configurations for MiniDFSCluster
+ * and MiniYarn
+ */
+
+public class SecureTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(SecureTestEnvironment.class);
+
+	private static MiniKdc kdc;
+
+	private static String testKeytab = null;
+
+	private static String testPrincipal = null;
+
+	private static String testZkServerPrincipal = null;
+
+	private static String testZkClientPrincipal = null;
+
+	private static String testKafkaServerPrincipal = null;
+
+	private static String hadoopServicePrincipal = null;
+
+	private static File baseDirForSecureRun = null;
+
+	public static void prepare(TemporaryFolder tempFolder) {
+
+		try {
+			baseDirForSecureRun = tempFolder.newFolder();
+			LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);
+
+			String hostName = "localhost";
+			Properties kdcConf = MiniKdc.createConf();
+			if(LOG.isDebugEnabled()) {
+				kdcConf.setProperty(MiniKdc.DEBUG, "true");
+			}
+			kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName);
+			kdc = new MiniKdc(kdcConf, baseDirForSecureRun);
+			kdc.start();
+			LOG.info("Started Mini KDC");
+
+			File keytabFile = new File(baseDirForSecureRun, "test-users.keytab");
+			testKeytab = keytabFile.getAbsolutePath();
+			testZkServerPrincipal = "zookeeper/127.0.0.1";
+			testZkClientPrincipal = "zk-client/127.0.0.1";
+			testKafkaServerPrincipal = "kafka/" + hostName;
+			hadoopServicePrincipal = "hadoop/" + hostName;
+			testPrincipal = "client/" + hostName;
+
+			kdc.createPrincipal(keytabFile, testPrincipal, testZkServerPrincipal,
+					hadoopServicePrincipal,
+					testZkClientPrincipal,
+					testKafkaServerPrincipal);
+
+			testPrincipal = testPrincipal + "@" + kdc.getRealm();
+			testZkServerPrincipal = testZkServerPrincipal + "@" + kdc.getRealm();
+			testZkClientPrincipal = testZkClientPrincipal + "@" + kdc.getRealm();
+			testKafkaServerPrincipal = testKafkaServerPrincipal + "@" + kdc.getRealm();
+			hadoopServicePrincipal = hadoopServicePrincipal + "@" + kdc.getRealm();
+
+			LOG.info("-------------------------------------------------------------------");
+			LOG.info("Test Principal: {}", testPrincipal);
+			LOG.info("Test ZK Server Principal: {}", testZkServerPrincipal);
+			LOG.info("Test ZK Client Principal: {}", testZkClientPrincipal);
+			LOG.info("Test Kafka Server Principal: {}", testKafkaServerPrincipal);
+			LOG.info("Test Hadoop Service Principal: {}", hadoopServicePrincipal);
+			LOG.info("Test Keytab: {}", testKeytab);
+			LOG.info("-------------------------------------------------------------------");
+
+			//Security Context is established to allow non hadoop applications that requires JAAS
+			//based SASL/Kerberos authentication to work. However, for Hadoop specific applications
+			//the context can be reinitialized with Hadoop configuration by calling
+			//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
+			//See Yarn test case module for reference
+			createJaasConfig(baseDirForSecureRun);
+			SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+			Configuration flinkConfig = new Configuration();
+			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
+			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
+			flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath());
+			ctx.setFlinkConfiguration(flinkConfig);
+			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
+
+			populateSystemEnvVariables();
+
+		} catch(Exception e) {
+			LOG.error("Exception occured while preparing secure environment. Reason: {}", e);
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	public static void cleanup() {
+
+		LOG.info("Cleaning up Secure Environment");
+
+		if( kdc != null) {
+			kdc.stop();
+			LOG.info("Stopped KDC server");
+		}
+
+		resetSystemEnvVariables();
+
+		testKeytab = null;
+		testPrincipal = null;
+		testZkServerPrincipal = null;
+		hadoopServicePrincipal = null;
+		baseDirForSecureRun = null;
+
+	}
+
+	private static void populateSystemEnvVariables() {
+
+		if(LOG.isDebugEnabled()) {
+			System.setProperty("FLINK_JAAS_DEBUG", "true");
+			System.setProperty("sun.security.krb5.debug", "true");
+		}
+
+		System.setProperty("java.security.krb5.conf", kdc.getKrb5conf().getAbsolutePath());
+
+		System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+		System.setProperty("zookeeper.kerberos.removeHostFromPrincipal", "true");
+		System.setProperty("zookeeper.kerberos.removeRealmFromPrincipal", "true");
+	}
+
+	private static void resetSystemEnvVariables() {
+		System.clearProperty("java.security.krb5.conf");
+		System.clearProperty("FLINK_JAAS_DEBUG");
+		System.clearProperty("sun.security.krb5.debug");
+
+		System.clearProperty("zookeeper.authProvider.1");
+		System.clearProperty("zookeeper.kerberos.removeHostFromPrincipal");
+		System.clearProperty("zookeeper.kerberos.removeRealmFromPrincipal");
+	}
+
+	public static org.apache.flink.configuration.Configuration populateFlinkSecureConfigurations(
+			@Nullable org.apache.flink.configuration.Configuration flinkConf) {
+
+		org.apache.flink.configuration.Configuration conf;
+
+		if(flinkConf== null) {
+			conf = new org.apache.flink.configuration.Configuration();
+		} else {
+			conf = flinkConf;
+		}
+
+		conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab);
+		conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal);
+
+		return conf;
+	}
+
+	public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap() {
+
+		Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap = new HashMap<>();
+
+		if(testZkServerPrincipal != null ) {
+			TestingSecurityContext.ClientSecurityConfiguration zkServer =
+					new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab,
+							"Server", "zk-server");
+			clientSecurityConfigurationMap.put("Server",zkServer);
+		}
+
+		if(testZkClientPrincipal != null ) {
+			TestingSecurityContext.ClientSecurityConfiguration zkClient =
+					new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab,
+							"Client", "zk-client");
+			clientSecurityConfigurationMap.put("Client",zkClient);
+		}
+
+		if(testKafkaServerPrincipal != null ) {
+			TestingSecurityContext.ClientSecurityConfiguration kafkaServer =
+					new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab,
+							"KafkaServer", "kafka-server");
+			clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
+		}
+
+		return clientSecurityConfigurationMap;
+	}
+
+	public static String getTestKeytab() {
+		return testKeytab;
+	}
+
+	public static String getHadoopServicePrincipal() {
+		return hadoopServicePrincipal;
+	}
+
+	/*
+	 * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL
+	 * implementation lookup java.security.auth.login.config
+	 */
+	private static void  createJaasConfig(File baseDirForSecureRun) {
+
+		try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun,SecurityContext.JAAS_CONF_FILENAME), true);
+			BufferedWriter bw = new BufferedWriter(fw);
+			PrintWriter out = new PrintWriter(bw))
+		{
+			out.println("sample {");
+			out.println("useKeyTab=false");
+			out.println("useTicketCache=true;");
+			out.println("};");
+		} catch (IOException e) {
+			LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage());
+			throw new RuntimeException(e);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
new file mode 100644
index 0000000..25b2362
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.JaasConfiguration;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage
+ * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated
+ * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module
+ * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier
+ */
+
+@Internal
+public class TestingJaasConfiguration extends JaasConfiguration {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class);
+
+	public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap;
+
+	TestingJaasConfiguration(String keytab, String principal, Map<String,
+			TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) {
+		super(keytab, principal);
+		this.clientSecurityConfigurationMap = clientSecurityConfigurationMap;
+	}
+
+	@Override
+	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
+
+		LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName);
+
+		AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName);
+
+		if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) {
+
+			if(clientSecurityConfigurationMap.containsKey(applicationName)) {
+
+				LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName);
+
+				TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName);
+
+				if(appConfigurationEntry != null && appConfigurationEntry.length > 0) {
+
+					for(int count=0; count < appConfigurationEntry.length; count++) {
+
+						AppConfigurationEntry ace = appConfigurationEntry[count];
+
+						if (ace.getOptions().containsKey("keyTab")) {
+
+							String keyTab = conf.getKeytab();
+							String principal = conf.getPrincipal();
+
+							LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " +
+									"use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal);
+
+							Map<String, String> newKeytabKerberosOptions = new HashMap<>();
+							newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
+
+							newKeytabKerberosOptions.put("keyTab", keyTab);
+							newKeytabKerberosOptions.put("principal", principal);
+
+							AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+									KerberosUtil.getKrb5LoginModuleName(),
+									AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+									newKeytabKerberosOptions);
+							appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce};
+
+							LOG.debug("---->Login Module is using Keytab based configuration<------");
+							LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName());
+							LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag());
+							LOG.debug("Options: " + keytabKerberosAce.getOptions());
+						}
+					}
+				}
+			}
+
+		}
+
+		return appConfigurationEntry;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
new file mode 100644
index 0000000..5e84c7e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.SecurityContext;
+
+import java.util.Map;
+
+/*
+ * Test security context to support handling both client and server principals in MiniKDC
+ * This class is used only in integration test code for connectors like Kafka, HDFS etc.,
+ */
+@Internal
+public class TestingSecurityContext {
+
+	public static void install(SecurityContext.SecurityConfiguration config,
+						Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap)
+			throws Exception {
+
+		SecurityContext.install(config);
+
+		// establish the JAAS config for Test environment
+		TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
+				config.getPrincipal(), clientSecurityConfigurationMap);
+		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+	}
+
+	public static class ClientSecurityConfiguration {
+
+		private String principal;
+
+		private String keytab;
+
+		private String moduleName;
+
+		private String jaasServiceName;
+
+		public String getPrincipal() {
+			return principal;
+		}
+
+		public String getKeytab() {
+			return keytab;
+		}
+
+		public String getModuleName() {
+			return moduleName;
+		}
+
+		public String getJaasServiceName() {
+			return jaasServiceName;
+		}
+
+		public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) {
+			this.principal = principal;
+			this.keytab = keytab;
+			this.moduleName = moduleName;
+			this.jaasServiceName = jaasServiceName;
+		}
+
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index ffdca36..68e4752 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -103,6 +103,13 @@ under the License.
 			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>
@@ -298,6 +305,19 @@ under the License.
 					<skip>true</skip>
 				</configuration>
 			</plugin>
+
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+			
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index d03d9eb..a503115 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -180,7 +180,7 @@ public class FlinkYarnSessionCliTest {
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
-				new Path("/tmp"), false);
+				new Path("/temp"), false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a293348..9d6ff85 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -50,14 +50,14 @@ import java.util.concurrent.TimeUnit;
 
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-	private static TestingServer zkServer;
+	protected static TestingServer zkServer;
 
-	private static ActorSystem actorSystem;
+	protected static ActorSystem actorSystem;
 
-	private static final int numberApplicationAttempts = 10;
+	protected static final int numberApplicationAttempts = 10;
 
 	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
+	public TemporaryFolder temp = new TemporaryFolder();
 
 	@BeforeClass
 	public static void setup() {
@@ -108,7 +108,11 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 
-		String fsStateHandlePath = tmp.getRoot().getPath();
+		String fsStateHandlePath = temp.getRoot().getPath();
+
+		// load the configuration
+		File configDirectory = new File(confDirPath);
+		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 
 		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ddea4dd..650397d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -516,6 +518,27 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		} catch(Throwable t) {
 			LOG.warn("Error while detached yarn session was running", t);
 			Assert.fail(t.getMessage());
+		} finally {
+
+			//cleanup the yarn-properties file
+			String confDirPath = System.getenv("FLINK_CONF_DIR");
+			File configDirectory = new File(confDirPath);
+			LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath());
+
+			// load the configuration
+			LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
+			GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+			try {
+				File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+				if(yarnPropertiesFile.exists()) {
+					LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
+					yarnPropertiesFile.delete();
+				}
+			} catch (Exception e) {
+				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
+			}
+
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 3caa0ee..ca696f9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -100,6 +100,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		while(getRunningContainers() < 2) {
 			sleep(500);
 		}
+
+		//additional sleep for the JM/TM to start and establish connection
+		sleep(2000);
 		LOG.info("Two containers are running. Killing the application");
 
 		// kill application "externally".
@@ -121,6 +124,27 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		} catch(Throwable t) {
 			LOG.warn("Killing failed", t);
 			Assert.fail();
+		} finally {
+
+			//cleanup the yarn-properties file
+			String confDirPath = System.getenv("FLINK_CONF_DIR");
+			File configDirectory = new File(confDirPath);
+			LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath());
+
+			// load the configuration
+			LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
+			GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+			try {
+				File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+				if(yarnPropertiesFile.exists()) {
+					LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
+					yarnPropertiesFile.delete();
+				}
+			} catch (Exception e) {
+				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
+			}
+
 		}
 
 		LOG.info("Finished testDetachedMode()");

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
new file mode 100644
index 0000000..0b7c230
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
+
+	@BeforeClass
+	public static void setup() {
+
+		LOG.info("starting secure cluster environment for testing");
+
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+		yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured");
+
+		SecureTestEnvironment.prepare(tmp);
+
+		populateYarnSecureConfigurations(yarnConfiguration,SecureTestEnvironment.getHadoopServicePrincipal(),
+				SecureTestEnvironment.getTestKeytab());
+
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+				SecureTestEnvironment.getTestKeytab());
+		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+				SecureTestEnvironment.getHadoopServicePrincipal());
+
+		SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+		ctx.setFlinkConfiguration(flinkConfig);
+		ctx.setHadoopConfiguration(yarnConfiguration);
+		try {
+			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+
+			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					startYARNSecureMode(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
+							SecureTestEnvironment.getTestKeytab());
+					return null;
+				}
+			});
+
+		} catch(Exception e) {
+			throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
+		}
+
+	}
+
+	@AfterClass
+	public static void teardownSecureCluster() throws Exception {
+		LOG.info("tearing down secure cluster environment");
+		SecureTestEnvironment.cleanup();
+	}
+
+	/* For secure cluster testing, it is enough to run only one test and override below test methods
+	 * to keep the overall build time minimal
+	 */
+	@Override
+	public void testQueryCluster() {}
+
+	@Override
+	public void testNonexistingQueue() {}
+
+	@Override
+	public void testResourceComputation() {}
+
+	@Override
+	public void testfullAlloc() {}
+
+	@Override
+	public void testJavaAPI() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 6270010..605aa44 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import akka.actor.Identify;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
@@ -29,6 +30,8 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -62,6 +65,8 @@ import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -123,6 +128,11 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	protected static File flinkLibFolder;
 
+	/**
+	 * Temporary folder where Flink configurations will be kept for secure run
+	 */
+	protected static File tempConfPathForSecureRun = null;
+
 	static {
 		yarnConfiguration = new YarnConfiguration();
 		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -140,6 +150,23 @@ public abstract class YarnTestBase extends TestLogger {
 	}
 
 
+	public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
+
+		conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+		conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+
+		conf.set(YarnConfiguration.RM_KEYTAB, keytab);
+		conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
+		conf.set(YarnConfiguration.NM_KEYTAB, keytab);
+		conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
+
+		conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+		conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+		conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+		conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+
+		conf.set("hadoop.security.auth_to_local","RULE:[1:$1] RULE:[2:$1]");
+	}
 
 	/**
 	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
@@ -336,8 +363,16 @@ public abstract class YarnTestBase extends TestLogger {
 		return count;
 	}
 
+	public static void startYARNSecureMode(Configuration conf, String principal, String keytab) {
+		start(conf, principal, keytab);
+	}
+
 	public static void startYARNWithConfig(Configuration conf) {
-		// set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file
+		start(conf,null,null);
+	}
+
+	private static void start(Configuration conf, String principal, String keytab) {
+		// set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file
 		File homeDir = null;
 		try {
 			homeDir = tmp.newFolder();
@@ -374,7 +409,39 @@ public abstract class YarnTestBase extends TestLogger {
 			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
 			Assert.assertNotNull(flinkConfDirPath);
 
-			map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
+			if(!StringUtils.isBlank(principal) && !StringUtils.isBlank(keytab)) {
+				//copy conf dir to test temporary workspace location
+				tempConfPathForSecureRun = tmp.newFolder("conf");
+
+				String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath();
+				FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
+
+				try(FileWriter fw = new FileWriter(new File(tempConfPathForSecureRun,"flink-conf.yaml"), true);
+					BufferedWriter bw = new BufferedWriter(fw);
+					PrintWriter out = new PrintWriter(bw))
+				{
+					LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file");
+					out.println("");
+					out.println("#Security Configurations Auto Populated ");
+					out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
+					out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+					out.println("");
+				} catch (IOException e) {
+					LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage());
+					throw new RuntimeException(e);
+				}
+
+				String configDir = tempConfPathForSecureRun.getAbsolutePath();
+
+				LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
+
+				Assert.assertNotNull(configDir);
+
+				map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
+
+			} else {
+				map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
+			}
 
 			File yarnConfFile = writeYarnSiteConfigXML(conf);
 			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
@@ -392,6 +459,7 @@ public abstract class YarnTestBase extends TestLogger {
 			LOG.error("setup failure", ex);
 			Assert.fail();
 		}
+
 	}
 
 	/**
@@ -421,7 +489,6 @@ public abstract class YarnTestBase extends TestLogger {
 		System.setOut(new PrintStream(outContent));
 		System.setErr(new PrintStream(errContent));
 
-
 		final int START_TIMEOUT_SECONDS = 60;
 
 		Runner runner = new Runner(args, type);
@@ -624,12 +691,23 @@ public abstract class YarnTestBase extends TestLogger {
 
 	@AfterClass
 	public static void teardown() throws Exception {
+
+		LOG.info("Stopping MiniYarn Cluster");
+		yarnCluster.stop();
+
 		// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
 		Map<String, String> map = new HashMap<>(System.getenv());
 		map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
+		map.remove("YARN_CONF_DIR");
+		map.remove("IN_TESTS");
 		TestBaseUtils.setEnv(map);
 
-		// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
+		if(tempConfPathForSecureRun != null) {
+			FileUtil.fullyDelete(tempConfPathForSecureRun);
+			tempConfPathForSecureRun = null;
+		}
+
+		// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
 		// to <flinkRoot>/target/flink-yarn-tests-*.
 		// The files from there are picked up by the ./tools/travis_watchdog.sh script
 		// to upload them to Amazon S3.
@@ -646,6 +724,7 @@ public abstract class YarnTestBase extends TestLogger {
 				LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
 			}
 		}
+
 	}
 
 	public static boolean isOnTravis() {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
index e94ca26..8f56c1f 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -34,3 +34,8 @@ log4j.logger.org.apache.hadoop=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
 
+log4j.logger.org.apache.directory=OFF
+log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.net.sf.ehcache=OFF
+log4j.logger.org.apache.hadoop.metrics2=OFF
+log4j.logger.org.apache.hadoop.conf.Configuration=OFF