You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:46 UTC

[13/50] [abbrv] flink git commit: [FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

[FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster deployment(addresses HDHS, Kafka and ZK services)

FLINK-3929 Added MiniKDC support for Kafka, Zookeeper, RollingFS and Yarn integration test modules


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25a622fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25a622fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25a622fd

Branch: refs/heads/flip-6
Commit: 25a622fd9c9255bd1a5b4b6ff7891730dce34ac1
Parents: 303f6fe
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Wed Jul 20 17:08:33 2016 -0700
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 docs/internals/flink_security.md                |  87 ++++++
 docs/setup/config.md                            |  27 +-
 .../org/apache/flink/client/CliFrontend.java    |  38 +--
 .../flink/configuration/ConfigConstants.java    |  22 ++
 .../org/apache/flink/util/Preconditions.java    |   2 +-
 .../src/main/flink-bin/conf/flink-jaas.conf     |  26 ++
 flink-dist/src/main/resources/flink-conf.yaml   |  25 ++
 .../MesosApplicationMasterRunner.java           |   3 +-
 .../clusterframework/BootstrapTools.java        |   7 +
 .../runtime/security/JaasConfiguration.java     | 160 ++++++++++
 .../flink/runtime/security/SecurityContext.java | 313 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  34 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  25 +-
 .../runtime/security/JaasConfigurationTest.java |  52 +++
 .../runtime/security/SecurityContextTest.java   |  77 +++++
 .../flink-connector-filesystem/pom.xml          |  36 +++
 .../connectors/fs/RollingSinkITCase.java        |  19 +-
 .../connectors/fs/RollingSinkSecuredITCase.java | 232 ++++++++++++++
 .../src/test/resources/log4j-test.properties    |   2 +
 .../connectors/kafka/Kafka08ProducerITCase.java |   1 -
 .../kafka/KafkaTestEnvironmentImpl.java         |  13 +-
 .../flink-connector-kafka-0.9/pom.xml           |  18 ++
 .../connectors/kafka/Kafka09ITCase.java         |  14 +-
 .../connectors/kafka/Kafka09ProducerITCase.java |   1 -
 .../kafka/Kafka09SecureRunITCase.java           |  62 ++++
 .../kafka/KafkaTestEnvironmentImpl.java         |  81 ++++-
 .../src/test/resources/log4j-test.properties    |   2 +
 .../flink-connector-kafka-base/pom.xml          |  19 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 100 ++++--
 .../connectors/kafka/KafkaProducerTestBase.java |  14 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  29 +-
 .../connectors/kafka/KafkaTestBase.java         |  72 +++--
 .../connectors/kafka/KafkaTestEnvironment.java  |  11 +-
 .../kafka/testutils/DataGenerators.java         |   9 +-
 .../flink-test-utils/pom.xml                    |  25 ++
 .../util/StreamingMultipleProgramsTestBase.java |   6 +
 .../flink/test/util/SecureTestEnvironment.java  | 249 +++++++++++++++
 .../test/util/TestingJaasConfiguration.java     | 106 +++++++
 .../flink/test/util/TestingSecurityContext.java |  80 +++++
 flink-yarn-tests/pom.xml                        |  20 ++
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  14 +-
 .../YARNSessionCapacitySchedulerITCase.java     |  23 ++
 .../flink/yarn/YARNSessionFIFOITCase.java       |  24 ++
 .../yarn/YARNSessionFIFOSecuredITCase.java      | 103 ++++++
 .../org/apache/flink/yarn/YarnTestBase.java     |  87 +++++-
 .../src/test/resources/log4j-test.properties    |   5 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 104 ++++--
 .../main/java/org/apache/flink/yarn/Utils.java  |   8 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 158 ++++++++--
 .../org/apache/flink/yarn/YarnConfigKeys.java   |   6 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  88 ++++--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  28 +-
 pom.xml                                         |   7 +
 tools/log4j-travis.properties                   |   3 +
 55 files changed, 2553 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
new file mode 100644
index 0000000..846273b
--- /dev/null
+++ b/docs/internals/flink_security.md
@@ -0,0 +1,87 @@
+---
+title:  "Flink Security"
+# Top navigation
+top-nav-group: internals
+top-nav-pos: 10
+top-nav-title: Flink Security
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) 
+and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers 
+who plans to run Flink on a secure environment.
+
+## Objective
+
+The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, 
+streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be  able to authenticate against secure 
+data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
+context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+
+## How Flink Security works
+Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
+A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security 
+requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period 
+of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+
+- Kafka (0.9)
+- HDFS
+- ZooKeeper
+
+Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
+(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+
+Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login 
+module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is 
+instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+
+It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled 
+then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+
+## Security Configurations
+
+Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+
+- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+
+- `security.principal`: User principal name that the Flink cluster should run as.
+
+The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+
+## Standalone Mode:
+
+Steps to run a secure Flink cluster in standalone/cluster mode:
+- Add security configurations to Flink configuration file (on all cluster nodes) 
+- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
+- Deploy Flink cluster using cluster start/stop scripts or CLI
+
+## Yarn Mode:
+
+Steps to run secure Flink cluster in Yarn mode:
+- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI) 
+- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
+- Deploy Flink cluster using CLI
+
+In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
+Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a> 
+
+## Token Renewal
+
+UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 51475cc..54ef394 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -95,18 +95,35 @@ These options are useful for debugging a Flink application for memory and garbag
 
 ### Kerberos
 
-Flink supports Kerberos authentication of Hadoop services such as HDFS, YARN, or HBase.
+Flink supports Kerberos authentication for the following services 
+
++ Hadoop Components: such as HDFS, YARN, or HBase.
++ Kafka Connectors (version 0.9+)
++ Zookeeper Server/Client
+
+Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
 
 **Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
   other versions have critical bugs which might fail the Flink job
   unexpectedly.**
 
+**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+
+> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+
 While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
 
-Please make sure to set the maximum ticket life span high long running jobs. The renewal time of the ticket, on the other hand, is not important because Hadoop abstracts this away using its own security tocken renewal system. Hadoop makes sure that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs. 
 
 If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
 
+> Keytab (security principal and keytab can be configured through Flink configuration file)
+- `security.keytab`: Path to Keytab file
+- `security.principal`: Principal associated with the keytab
+
+Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+
+For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
 
 ### Other
 
@@ -315,6 +332,12 @@ Previously this key was named `recovery.mode` and the default value was `standal
 
 - `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`.
 
+### ZooKeeper-Security
+
+- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to "true" if ZooKeeper cluster is running in secure mode (Kerberos)
+
+- `zookeeper.sasl.service-name`: (Default: `zookeeper`) If the ZooKeeper server is configured with a different service name (default:"zookeeper") then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail. 
+
 ## Environment
 
 - `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. It has to be an absolute path.

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index c7fb647..575ffad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -66,7 +66,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -161,6 +161,9 @@ public class CliFrontend {
 				"filesystem scheme from configuration.", e);
 		}
 
+		this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath()
+				+ ".." + File.separator);
+
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
 	}
 
@@ -982,25 +985,7 @@ public class CliFrontend {
 		// do action
 		switch (action) {
 			case ACTION_RUN:
-				// run() needs to run in a secured environment for the optimizer.
-				if (SecurityUtils.isSecurityEnabled()) {
-					String message = "Secure Hadoop environment setup detected. Running in secure context.";
-					LOG.info(message);
-
-					try {
-						return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
-							@Override
-							public Integer run() throws Exception {
-								return CliFrontend.this.run(params);
-							}
-						});
-					}
-					catch (Exception e) {
-						return handleError(e);
-					}
-				} else {
-					return run(params);
-				}
+				return CliFrontend.this.run(params);
 			case ACTION_LIST:
 				return list(params);
 			case ACTION_INFO:
@@ -1037,12 +1022,19 @@ public class CliFrontend {
 	/**
 	 * Submits the job based on the arguments
 	 */
-	public static void main(String[] args) {
+	public static void main(final String[] args) {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
 
 		try {
-			CliFrontend cli = new CliFrontend();
-			int retCode = cli.parseParameters(args);
+			final CliFrontend cli = new CliFrontend();
+			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(cli.config));
+			int retCode = SecurityContext.getInstalled()
+					.runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+						@Override
+						public Integer run() {
+							return cli.parseParameters(args);
+						}
+					});
 			System.exit(retCode);
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f0f1b6b..9e66e2a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -758,6 +758,12 @@ public final class ConfigConstants {
 	@PublicEvolving
 	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
 
+	@PublicEvolving
+	public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";
+
+	@PublicEvolving
+	public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";
+
 	/** Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
 	@Deprecated
 	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
@@ -1233,6 +1239,9 @@ public final class ConfigConstants {
 	/** ZooKeeper default leader port. */
 	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
+	/** Defaults for ZK client security **/
+	public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
+
 	// ------------------------- Queryable state ------------------------------
 
 	/** Port to bind KvState server to. */
@@ -1279,6 +1288,19 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the location of the lib folder */
 	public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+	// -------------------------------- Security -------------------------------
+
+	/**
+	 * The config parameter defining security credentials required
+	 * for securing Flink cluster.
+	 */
+
+	/** Keytab file key name to be used in flink configuration file */
+	public static final String SECURITY_KEYTAB_KEY = "security.keytab";
+
+	/** Kerberos security principal key name to be used in flink configuration file */
+	public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
+
 
 	/**
 	 * Not instantiable.

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
index ea6b9dd..e970c13 100644
--- a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
+++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
@@ -293,7 +293,7 @@ public final class Preconditions {
 
 		return builder.toString();
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	/** Private constructor to prevent instantiation */

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
new file mode 100644
index 0000000..d476e24
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+  useKeyTab=false
+  useTicketCache=true;
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 27fd84a..c876922 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -139,6 +139,31 @@ jobmanager.web.port: 8081
 #
 # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
 #
+
 # high-availability: zookeeper
 # high-availability.zookeeper.quorum: localhost:2181
 # high-availability.zookeeper.storageDir: hdfs:///flink/ha/
+
+#==============================================================================
+# Flink Cluster Security Configuration (optional configuration)
+#==============================================================================
+
+# Kerberos security for the connectors can be enabled by providing below configurations
+# Security works in two modes - keytab/principal combination or using the Kerberos token cache
+# If keytab and principal are not provided, token cache (manual kinit) will be used
+
+#security.keytab: /path/to/kerberos/keytab
+#security.principal: flink-user
+
+#==============================================================================
+# ZK Security Configuration (optional configuration)
+#==============================================================================
+# Below configurations are applicable if ZK quorum is configured for Kerberos security
+
+# SASL authentication is disabled by default and can be enabled by changig the value to false
+#
+# zookeeper.sasl.disable: true
+
+# Override below configuration to provide custom ZK service name if configured
+#
+# zookeeper.sasl.service-name: zookeeper
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 8fb6af4..5ec39c2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -582,10 +582,11 @@ public class MesosApplicationMasterRunner {
 		// build the launch command
 		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
 		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+		boolean hasKrb5 = false;
 
 		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
 			flinkConfig, tmParams.containeredParameters(), ".", ".",
-			hasLogback, hasLog4j, taskManagerMainClass);
+			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
 		cmd.setValue(launchCommand);
 
 		// build the environment variables

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index c9748cb..d844f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -308,6 +308,7 @@ public class BootstrapTools {
 			String logDirectory,
 			boolean hasLogback,
 			boolean hasLog4j,
+			boolean hasKrb5,
 			Class<?> mainClass) {
 
 		StringBuilder tmCommand = new StringBuilder("$JAVA_HOME/bin/java");
@@ -328,6 +329,12 @@ public class BootstrapTools {
 				tmCommand.append(" -Dlog4j.configuration=file:")
 						.append(configDirectory).append("/log4j.properties");
 			}
+
+			//applicable only for YarnMiniCluster secure test run
+			//krb5.conf file will be available as local resource in JM/TM container
+			if(hasKrb5) {
+				tmCommand.append(" -Djava.security.krb5.conf=krb5.conf");
+			}
 		}
 
 		tmCommand.append(' ').append(mainClass.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
new file mode 100644
index 0000000..c4527dd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * JAAS configuration provider object that provides default LoginModule for various connectors that supports
+ * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class.
+ *
+ * Different connectors uses different login module name to implement JAAS based authentication support.
+ * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the
+ * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right
+ * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides
+ * a standard lookup to get the login module entry for the JAAS based authentication to work.
+ *
+ * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate.
+ *
+ * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a>
+ *
+ */
+
+@Internal
+public class JaasConfiguration extends Configuration {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
+
+	public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
+
+	public static final boolean IBM_JAVA;
+
+	private static final Map<String, String> debugOptions = new HashMap<>();
+
+	private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
+
+	private static final Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+	private static final AppConfigurationEntry userKerberosAce;
+
+	private AppConfigurationEntry keytabKerberosAce = null;
+
+	static {
+
+		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+		if(LOG.isDebugEnabled()) {
+			debugOptions.put("debug", "true");
+		}
+
+		if(IBM_JAVA) {
+			kerberosCacheOptions.put("useDefaultCcache", "true");
+		} else {
+			kerberosCacheOptions.put("doNotPrompt", "true");
+			kerberosCacheOptions.put("useTicketCache", "true");
+		}
+
+		String ticketCache = System.getenv("KRB5CCNAME");
+		if(ticketCache != null) {
+			if(IBM_JAVA) {
+				System.setProperty("KRB5CCNAME", ticketCache);
+			} else {
+				kerberosCacheOptions.put("ticketCache", ticketCache);
+			}
+		}
+
+		kerberosCacheOptions.put("renewTGT", "true");
+		kerberosCacheOptions.putAll(debugOptions);
+
+		userKerberosAce = new AppConfigurationEntry(
+				KerberosUtil.getKrb5LoginModuleName(),
+				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+				kerberosCacheOptions);
+
+	}
+
+	protected JaasConfiguration(String keytab, String principal) {
+
+		LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal);
+
+		if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
+				(!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){
+			throw new RuntimeException("Both keytab and principal are required and cannot be empty");
+		}
+
+		if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
+
+			if(IBM_JAVA) {
+				keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
+				keytabKerberosOptions.put("credsType", "both");
+			} else {
+				keytabKerberosOptions.put("keyTab", keytab);
+				keytabKerberosOptions.put("doNotPrompt", "true");
+				keytabKerberosOptions.put("useKeyTab", "true");
+				keytabKerberosOptions.put("storeKey", "true");
+			}
+
+			keytabKerberosOptions.put("principal", principal);
+			keytabKerberosOptions.put("refreshKrb5Config", "true");
+			keytabKerberosOptions.putAll(debugOptions);
+
+			keytabKerberosAce = new AppConfigurationEntry(
+					KerberosUtil.getKrb5LoginModuleName(),
+					AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+					keytabKerberosOptions);
+		}
+	}
+
+	public static Map<String, String> getKeytabKerberosOptions() {
+		return keytabKerberosOptions;
+	}
+
+	private static String prependFileUri(String keytabPath) {
+		File f = new File(keytabPath);
+		return f.toURI().toString();
+	}
+
+	@Override
+	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
+
+		LOG.debug("JAAS configuration requested for the application entry: {}", applicationName);
+
+		AppConfigurationEntry[] appConfigurationEntry;
+
+		if(keytabKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce};
+		} else {
+			appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce};
+		}
+
+		return appConfigurationEntry;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
new file mode 100644
index 0000000..4b8b69b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+
+/*
+ * Process-wide security context object which initializes UGI with appropriate security credentials and also it
+ * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
+ * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+ */
+@Internal
+public class SecurityContext {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class);
+
+	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
+
+	private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
+
+	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	private static SecurityContext installedContext;
+
+	public static SecurityContext getInstalled() { return installedContext; }
+
+	private UserGroupInformation ugi;
+
+	SecurityContext(UserGroupInformation ugi) {
+		if(ugi == null) {
+			throw new RuntimeException("UGI passed cannot be null");
+		}
+		this.ugi = ugi;
+	}
+
+	public <T> T runSecured(final FlinkSecuredRunner<T> runner) throws Exception {
+		return ugi.doAs(new PrivilegedExceptionAction<T>() {
+			@Override
+			public T run() throws Exception {
+				return runner.run();
+			}
+		});
+	}
+
+	public static void install(SecurityConfiguration config) throws Exception {
+
+		// perform static initialization of UGI, JAAS
+		if(installedContext != null) {
+			LOG.warn("overriding previous security context");
+		}
+
+		// establish the JAAS config
+		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
+		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+
+		populateSystemSecurityProperties(config.flinkConf);
+
+		// establish the UGI login user
+		UserGroupInformation.setConfiguration(config.hadoopConf);
+
+		UserGroupInformation loginUser;
+
+		if(UserGroupInformation.isSecurityEnabled() &&
+				config.keytab != null && !StringUtils.isBlank(config.principal)) {
+			String keytabPath = (new File(config.keytab)).getAbsolutePath();
+
+			UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
+
+			loginUser = UserGroupInformation.getLoginUser();
+
+			// supplement with any available tokens
+			String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+			if(fileLocation != null) {
+				/*
+				 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+				 * used in the context of reading the stored tokens from UGI.
+				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+				 * loginUser.addCredentials(cred);
+				*/
+				try {
+					Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+							File.class, org.apache.hadoop.conf.Configuration.class);
+					Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null,new File(fileLocation),
+							config.hadoopConf);
+					Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+							Credentials.class);
+					addCredentialsMethod.invoke(loginUser,cred);
+				} catch(NoSuchMethodException e) {
+					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+				}
+			}
+		} else {
+			// login with current user credentials (e.g. ticket cache)
+			try {
+				//Use reflection API to get the login user object
+				//UserGroupInformation.loginUserFromSubject(null);
+				Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+				Subject subject = null;
+				loginUserFromSubjectMethod.invoke(null,subject);
+			} catch(NoSuchMethodException e) {
+				LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+			}
+
+			loginUser = UserGroupInformation.getLoginUser();
+			// note that the stored tokens are read automatically
+		}
+
+		boolean delegationToken = false;
+		final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for(Token<? extends TokenIdentifier> token : usrTok) {
+			final Text id = new Text(token.getIdentifier());
+			LOG.debug("Found user token " + id + " with " + token);
+			if(token.getKind().equals(HDFS_DELEGATION_KIND)) {
+				delegationToken = true;
+			}
+		}
+
+		if(UserGroupInformation.isSecurityEnabled() && !loginUser.hasKerberosCredentials()) {
+			//throw an error in non-yarn deployment if kerberos cache is not available
+			if(!delegationToken) {
+				LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+				throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+			}
+		}
+
+		installedContext = new SecurityContext(loginUser);
+	}
+
+	/*
+	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
+	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+	 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
+	 * Kafka current code behavior.
+	 */
+	private static void populateSystemSecurityProperties(Configuration configuration) {
+
+		//required to be empty for Kafka but we will override the property
+		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+
+		if(configuration == null) {
+			return;
+		}
+
+		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
+				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
+		if(disableSaslClient) {
+			LOG.info("SASL client auth for ZK will be disabled");
+			//SASL auth is disabled by default but will be enabled if specified in configuration
+			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
+			return;
+		}
+
+		String baseDir = configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
+		if(baseDir == null) {
+			String message = "SASL auth is enabled for ZK but unable to locate pseudo Jaas config " +
+					"since " + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
+			LOG.error(message);
+			throw new IllegalConfigurationException(message);
+		}
+
+		File f = new File(baseDir);
+		if(!f.exists() || !f.isDirectory()) {
+			LOG.error("Invalid flink base directory {} configuration provided", baseDir);
+			throw new IllegalConfigurationException("Invalid flink base directory configuration provided");
+		}
+
+		File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
+
+		if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
+
+			//check if there is a conf directory
+			File confDir = new File(f, "conf");
+			if(!confDir.exists() || !confDir.isDirectory()) {
+				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
+				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
+			}
+
+			jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
+
+			if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
+				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
+				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
+			}
+		}
+
+		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
+				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile);
+
+		//ZK client module lookup the configuration to handle SASL.
+		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile.getAbsolutePath());
+		System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+
+		String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
+		if(!StringUtils.isBlank(zkSaslServiceName)) {
+			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
+			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+		}
+
+	}
+
+	/**
+	 * Inputs for establishing the security context.
+	 */
+	public static class SecurityConfiguration {
+
+		Configuration flinkConf;
+
+		org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+
+		String keytab;
+
+		String principal;
+
+		public String getKeytab() {
+			return keytab;
+		}
+
+		public String getPrincipal() {
+			return principal;
+		}
+
+		public SecurityConfiguration setFlinkConfiguration(Configuration flinkConf) {
+
+			this.flinkConf = flinkConf;
+
+			String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+
+			String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+
+			validate(keytab, principal);
+
+			LOG.debug("keytab {} and principal {} .", keytab, principal);
+
+			this.keytab = keytab;
+
+			this.principal = principal;
+
+			return this;
+		}
+
+		public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
+			this.hadoopConf = conf;
+			return this;
+		}
+
+		private void validate(String keytab, String principal) {
+
+			if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
+					!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
+				if(StringUtils.isBlank(keytab)) {
+					LOG.warn("Keytab is null or empty");
+				}
+				if(StringUtils.isBlank(principal)) {
+					LOG.warn("Principal is null or empty");
+				}
+				throw new RuntimeException("Requires both keytab and principal to be provided");
+			}
+
+			if(!StringUtils.isBlank(keytab)) {
+				File keytabFile = new File(keytab);
+				if(!keytabFile.exists() || !keytabFile.isFile()) {
+					LOG.warn("Not a valid keytab: {} file", keytab);
+					throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+				}
+			}
+
+		}
+	}
+
+	public interface FlinkSecuredRunner<T> {
+		T run() throws Exception;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9c844ba..639c158 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -72,8 +72,8 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
+import org.apache.flink.runtime.security.{SecurityContext}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -2062,26 +2062,18 @@ object JobManager {
     }
 
     // run the job manager
+    SecurityContext.install(new SecurityConfiguration().setFlinkConfiguration(configuration))
+
     try {
-      if (SecurityUtils.isSecurityEnabled) {
-        LOG.info("Security is enabled. Starting secure JobManager.")
-        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-          override def run(): Unit = {
-            runJobManager(
-              configuration,
-              executionMode,
-              listeningHost,
-              listeningPortRange)
-          }
-        })
-      } else {
-        LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
-        runJobManager(
-          configuration,
-          executionMode,
-          listeningHost,
-          listeningPortRange)
-      }
+      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
+        override def run(): Unit = {
+          runJobManager(
+            configuration,
+            executionMode,
+            listeningHost,
+            listeningPortRange)
+        }
+      })
     } catch {
       case t: Throwable =>
         LOG.error("Failed to run JobManager.", t)

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 63a64a0..8534ee1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -71,8 +71,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
 import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
+import org.apache.flink.runtime.security.SecurityContext
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{MathUtils, NetUtils}
@@ -1521,19 +1521,14 @@ object TaskManager {
     val resourceId = ResourceID.generate()
 
     // run the TaskManager (if requested in an authentication enabled context)
+    SecurityContext.install(new SecurityConfiguration().setFlinkConfiguration(configuration))
+
     try {
-      if (SecurityUtils.isSecurityEnabled) {
-        LOG.info("Security is enabled. Starting secure TaskManager.")
-        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-          override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
-          }
-        })
-      }
-      else {
-        LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
-      }
+      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
+        override def run(): Unit = {
+          selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
+        }
+      })
     }
     catch {
       case t: Throwable =>
@@ -1588,6 +1583,8 @@ object TaskManager {
       }
     }
 
+    conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, cliConfig.getConfigDir() + "/..")
+
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
new file mode 100644
index 0000000..89e5ef9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JaasConfiguration}.
+ */
+public class JaasConfigurationTest {
+
+	@Test
+	public void testInvalidKerberosParams() {
+		String keytab = "user.keytab";
+		String principal = null;
+		try {
+			new JaasConfiguration(keytab, principal);
+		} catch(RuntimeException re) {
+			assertEquals("Both keytab and principal are required and cannot be empty",re.getMessage());
+		}
+	}
+
+	@Test
+	public void testDefaultAceEntry() {
+		JaasConfiguration conf = new JaasConfiguration(null,null);
+		javax.security.auth.login.Configuration.setConfiguration(conf);
+		final AppConfigurationEntry[] entry = conf.getAppConfigurationEntry("test");
+		AppConfigurationEntry ace = entry[0];
+		assertEquals(ace.getLoginModuleName(), KerberosUtil.getKrb5LoginModuleName());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
new file mode 100644
index 0000000..5f3d76a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SecurityContext}.
+ */
+public class SecurityContextTest {
+
+	@Test
+	public void testCreateInsecureHadoopCtx() {
+		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+		try {
+			SecurityContext.install(sc);
+			assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testInvalidUGIContext() {
+		try {
+			new SecurityContext(null);
+		} catch (RuntimeException re) {
+			assertEquals("UGI passed cannot be null",re.getMessage());
+		}
+	}
+
+
+	private String getOSUserName() throws Exception {
+		String userName = "";
+		String osName = System.getProperty( "os.name" ).toLowerCase();
+		String className = null;
+
+		if( osName.contains( "windows" ) ){
+			className = "com.sun.security.auth.module.NTSystem";
+		}
+		else if( osName.contains( "linux" ) ){
+			className = "com.sun.security.auth.module.UnixSystem";
+		}
+		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
+			className = "com.sun.security.auth.module.SolarisSystem";
+		}
+
+		if( className != null ){
+			Class<?> c = Class.forName( className );
+			Method method = c.getDeclaredMethod( "getUsername" );
+			Object o = c.newInstance();
+			userName = (String) method.invoke( o );
+		}
+		return userName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
index 5712856..edf299d 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -121,6 +121,42 @@ under the License.
 			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+		</dependency>
+
 	</dependencies>
 
+	<build>
+		<plugins>
+
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<!--
+					Enforce single threaded execution to avoid port conflicts when running
+					secure mini DFS cluster
+					-->
+					<forkCount>1</forkCount>
+					<reuseForks>false</reuseForks>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 7ee75c1..c3c8df5 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,6 +58,8 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -79,19 +82,24 @@ import java.util.Map;
 @Deprecated
 public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
+	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class);
+
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
+	protected static MiniDFSCluster hdfsCluster;
+	protected static org.apache.hadoop.fs.FileSystem dfs;
+	protected static String hdfsURI;
+	protected static Configuration conf = new Configuration();
 
+	protected static File dataDir;
 
 	@BeforeClass
 	public static void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
 
-		File dataDir = tempFolder.newFolder();
+		LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
+
+		dataDir = tempFolder.newFolder();
 
 		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
 		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
@@ -106,6 +114,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 	@AfterClass
 	public static void destroyHDFS() {
+		LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
 		hdfsCluster.shutdown();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
new file mode 100644
index 0000000..86cedaf
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+
+/**
+ * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
+ */
+
+//The test is disabled since MiniDFS secure run requires lower order ports to be used.
+//We can enable the test when the fix is available (HDFS-9213)
+@Ignore
+public class RollingSinkSecuredITCase extends RollingSinkITCase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
+
+	/*
+	 * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
+	 * and out-of-order sequence for secure cluster
+	 */
+	@BeforeClass
+	public static void setup() throws Exception {}
+
+	@AfterClass
+	public static void teardown() throws Exception {}
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {}
+
+	@AfterClass
+	public static void destroyHDFS() {}
+
+	@BeforeClass
+	public static void startSecureCluster() throws Exception {
+
+		LOG.info("starting secure cluster environment for testing");
+
+		dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+
+		SecureTestEnvironment.prepare(tempFolder);
+
+		populateSecureConfigurations();
+
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+				SecureTestEnvironment.getTestKeytab());
+		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+				SecureTestEnvironment.getHadoopServicePrincipal());
+
+		SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
+		ctx.setFlinkConfiguration(flinkConfig);
+		ctx.setHadoopConfiguration(conf);
+		try {
+			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+		} catch (Exception e) {
+			throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
+		}
+
+		File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml");
+
+		FileWriter writer = new FileWriter(hdfsSiteXML);
+		conf.writeXml(writer);
+		writer.flush();
+		writer.close();
+
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
+		TestBaseUtils.setEnv(map);
+
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		builder.checkDataNodeAddrConfig(true);
+		builder.checkDataNodeHostConfig(true);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
+
+		startSecureFlinkClusterWithRecoveryModeEnabled();
+	}
+
+	@AfterClass
+	public static void teardownSecureCluster() throws Exception {
+		LOG.info("tearing down secure cluster environment");
+
+		TestStreamEnvironment.unsetAsContext();
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+
+		hdfsCluster.shutdown();
+
+		SecureTestEnvironment.cleanup();
+	}
+
+	private static void populateSecureConfigurations() {
+
+		String dataTransferProtection = "authentication";
+
+		SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+		conf.set(DFS_NAMENODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+		conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
+		conf.set(DFS_DATANODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+		conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
+		conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+
+		conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+		conf.set("dfs.data.transfer.protection", dataTransferProtection);
+
+		conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name());
+
+		conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
+
+		conf.setInt("dfs.datanode.socket.write.timeout", 0);
+
+		/*
+		 * We ae setting the port number to privileged port - see HDFS-9213
+		 * This requires the user to have root privilege to bind to the port
+		 * Use below command (ubuntu) to set privilege to java process for the
+		 * bind() to work if the java process is not running as root.
+		 * setcap 'cap_net_bind_service=+ep' /path/to/java
+		 */
+		conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
+		conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
+		conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
+	}
+
+	private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+		try {
+			LOG.info("Starting Flink and ZK in secure mode");
+
+			dfs.mkdirs(new Path("/flink/checkpoints"));
+			dfs.mkdirs(new Path("/flink/recovery"));
+
+			org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
+
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + "/flink/recovery");
+			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
+
+			SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+
+			cluster = TestBaseUtils.startCluster(config, false);
+			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
+
+		} catch (Exception e) {
+			LOG.error("Exception occured while creating MiniFlink cluster. Reason: {}", e);
+			throw new RuntimeException(e);
+		}
+	}
+
+	/* For secure cluster testing, it is enough to run only one test and override below test methods
+	 * to keep the overall build time minimal
+	 */
+	@Override
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {}
+
+	@Override
+	public void testDateTimeRollingStringWriter() throws Exception {}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
index fe60d94..5c22851 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -25,3 +25,5 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index fc13719..5c951db 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
-
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 864773a..cbf3d06 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -81,6 +81,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public Properties getSecureProperties() {
+		return null;
+	}
+
+	@Override
 	public String getVersion() {
 		return "0.8";
 	}
@@ -132,9 +137,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		return server.socketServer().brokerId();
 	}
 
+	@Override
+	public boolean isSecureRunSupported() {
+		return false;
+	}
+
 
 	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
 		this.additionalServerProperties = additionalServerProperties;
 		File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
@@ -210,6 +220,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		if (zookeeper != null) {
 			try {
 				zookeeper.stop();
+				zookeeper.close();
 			}
 			catch (Exception e) {
 				LOG.warn("ZK.stop() failed", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index 3b31de6..bfcde82 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -134,6 +134,13 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minikdc</artifactId>
+			<version>${minikdc.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>
@@ -180,6 +187,17 @@ under the License.
 					<argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
 				</configuration>
 			</plugin>
+			<!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+			<plugin>
+				<groupId>org.apache.felix</groupId>
+				<artifactId>maven-bundle-plugin</artifactId>
+				<version>3.0.1</version>
+				<inherited>true</inherited>
+				<extensions>true</extensions>
+			</plugin>
 		</plugins>
 	</build>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 957833d..16ddcdc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -21,12 +21,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.junit.Test;
 
+import java.util.Properties;
 import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
 	// ------------------------------------------------------------------------
@@ -131,11 +131,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	public void testJsonTableSource() throws Exception {
 		String topic = UUID.randomUUID().toString();
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
 		// Names and types are determined in the actual test method of the
 		// base test class.
 		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
 				topic,
-				standardProps,
+				props,
 				new String[] {
 						"long",
 						"string",
@@ -159,11 +163,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
 		String topic = UUID.randomUUID().toString();
 
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+
 		// Names and types are determined in the actual test method of the
 		// base test class.
 		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
 				topic,
-				standardProps,
+				props,
 				new String[] {
 						"long",
 						"string",

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index 1288347..ae4f5b2 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
-
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
new file mode 100644
index 0000000..d12ec65
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
+
+	@BeforeClass
+	public static void prepare() throws IOException, ClassNotFoundException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting Kafka09SecureRunITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		SecureTestEnvironment.prepare(tempFolder);
+		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+		startClusters(true);
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		shutdownClusters();
+		SecureTestEnvironment.cleanup();
+	}
+
+
+	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
+	//The timeout for the test case is 2 times timeout of ZK connection
+	@Test(timeout = 600000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+}
\ No newline at end of file