You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/11 20:19:45 UTC

[7/8] flink git commit: [FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries

[FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries

Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055

This closes #3057


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc3a778c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc3a778c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc3a778c

Branch: refs/heads/master
Commit: fc3a778c0cafe1adc9efbd8796a8bd64122e4ad2
Parents: 03b62ae
Author: wrighe3 <er...@emc.com>
Authored: Tue Dec 20 01:07:38 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 20:17:44 2017 +0100

----------------------------------------------------------------------
 docs/internals/flink_security.md                | 133 +++++---
 docs/setup/config.md                            |  55 +++-
 .../connectors/fs/RollingSinkSecuredITCase.java |   8 +-
 .../flink/configuration/ConfigConstants.java    |  14 -
 .../configuration/HighAvailabilityOptions.java  |   8 -
 .../flink/configuration/SecurityOptions.java    |  62 ++++
 flink-dist/src/main/resources/flink-conf.yaml   |  21 +-
 .../java/hadoop/mapred/utils/HadoopUtils.java   |  21 ++
 .../MesosApplicationMasterRunner.java           |   2 -
 .../MesosTaskManagerRunner.java                 |   2 -
 .../overlays/KeytabOverlay.java                 |  14 +-
 .../runtime/security/DynamicConfiguration.java  | 111 +++++++
 .../runtime/security/JaasConfiguration.java     | 160 ---------
 .../flink/runtime/security/KerberosUtils.java   | 125 +++++++
 .../flink/runtime/security/SecurityUtils.java   | 322 ++++++++-----------
 .../runtime/security/modules/HadoopModule.java  | 119 +++++++
 .../runtime/security/modules/JaasModule.java    | 146 +++++++++
 .../security/modules/SecurityModule.java        |  59 ++++
 .../security/modules/ZooKeeperModule.java       |  76 +++++
 .../src/main/resources/flink-jaas.conf          |   9 +-
 .../overlays/KeytabOverlayTest.java             |   5 +-
 .../runtime/security/JaasConfigurationTest.java |  52 ---
 .../runtime/security/KerberosUtilsTest.java     |  48 +++
 .../runtime/security/SecurityUtilsTest.java     | 105 +++---
 .../flink/test/util/SecureTestEnvironment.java  |  48 +--
 .../test/util/TestingJaasConfiguration.java     | 106 ------
 .../flink/test/util/TestingSecurityContext.java |  38 +--
 .../yarn/YARNSessionFIFOSecuredITCase.java      |  10 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  26 +-
 ...bstractYarnFlinkApplicationMasterRunner.java |  25 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  31 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |  25 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  25 +-
 34 files changed, 1219 insertions(+), 797 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
index 846273b..a83f3b9 100644
--- a/docs/internals/flink_security.md
+++ b/docs/internals/flink_security.md
@@ -24,64 +24,123 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) 
-and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers 
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
 
 ## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
 
-The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, 
-streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be  able to authenticate against secure 
-data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
-context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure 
+data sources throughout the life of the job.  Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the credential configured for a given cluster.   To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration.   Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
 
 ## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security 
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period 
-of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only.  The following services and connectors are tested for Kerberos authentication:
 
-- Kafka (0.9)
+- Kafka (0.9+)
 - HDFS
+- HBase
 - ZooKeeper
 
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for each service or connector.  For example, the user may enable 
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.    The shared element is the configuration of 
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup.  The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context.   The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured.  Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html).   Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
 
-Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login 
-module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is 
-instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
 
-It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled 
-then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
 
-## Security Configurations
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
 
-Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
 
-- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context.  Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section).   To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
 
-- `security.principal`: User principal name that the Flink cluster should run as.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
 
-The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+ZooKeeper-related configuration overrides:
 
-## Standalone Mode:
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`).   The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts.   In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
 
 Steps to run a secure Flink cluster in standalone/cluster mode:
-- Add security configurations to Flink configuration file (on all cluster nodes) 
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
-- Deploy Flink cluster using cluster start/stop scripts or CLI
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
 
-## Yarn Mode:
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
 
-Steps to run secure Flink cluster in Yarn mode:
-- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI) 
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
-- Deploy Flink cluster using CLI
+#### Using `kinit` (YARN only)
 
-In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
-Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a> 
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it.  The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
 
-## Token Renewal
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
 
-UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
\ No newline at end of file
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab.  In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 724fe33..1b2be8a 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -96,37 +96,58 @@ These options are useful for debugging a Flink application for memory and garbag
 
 - `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
 
-### Kerberos
+### Kerberos-based Security
 
-Flink supports Kerberos authentication for the following services
+Flink supports Kerberos authentication for the following services:
 
-+ Hadoop Components: such as HDFS, YARN, or HBase.
++ Hadoop Components (such as HDFS, YARN, or HBase)
 + Kafka Connectors (version 0.9+)
-+ Zookeeper Server/Client
++ Zookeeper
 
-Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
-
-**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
+**Kerberos is supported only in Hadoop version 2.6.1 and above. All
   other versions have critical bugs which might fail the Flink job
   unexpectedly.**
 
-**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+Configuring Flink for Kerberos security involves three aspects:
+
+1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
+2. Making the Kerberos credential available to components and connectors as needed
+3. Configuring the component and/or connector to use Kerberos authentication
+
+To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options,
+or login using `kinit` before starting the cluster.
+
+It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues.   If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token lifetime.
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
 
-> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
 
-While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
+If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components.
 
-While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs.
+Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts.
 
-If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option.
+
+Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication.  For the Kafka connector,
+use the following properties:
+
+```
+security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+sasl.kerberos.service.name=kafka
+```
 
-> Keytab (security principal and keytab can be configured through Flink configuration file)
-- `security.keytab`: Path to Keytab file
-- `security.principal`: Principal associated with the keytab
+Flink provides some additional options to configure ZooKeeper security:
 
-Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`).
 
-For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
 
 ### Other
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index eb12d07..fa46fc7 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
@@ -116,13 +117,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		populateSecureConfigurations();
 
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
 				SecureTestEnvironment.getTestKeytab());
-		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
-		ctx.setHadoopConfiguration(conf);
+		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf);
 		try {
 			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index eabb754..fc389e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1395,20 +1395,6 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the Flink installation root directory */
 	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
 
-	// -------------------------------- Security -------------------------------
-
-	/**
-	 * The config parameter defining security credentials required
-	 * for securing Flink cluster.
-	 */
-
-	/** Keytab file key name to be used in flink configuration file */
-	public static final String SECURITY_KEYTAB_KEY = "security.keytab";
-
-	/** Kerberos security principal key name to be used in flink configuration file */
-	public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
-
-
 	/**
 	 * Not instantiable.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 1ee988a..4792eba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,14 +124,6 @@ public class HighAvailabilityOptions {
 			.defaultValue(3)
 			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
 
-	public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = 
-			key("zookeeper.sasl.disable")
-			.defaultValue(true);
-
-	public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = 
-			key("zookeeper.sasl.service-name")
-			.noDefaultValue();
-
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
new file mode 100644
index 0000000..67d101d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to security.
+ */
+public class SecurityOptions {
+
+	// ------------------------------------------------------------------------
+	//  Kerberos Options
+	// ------------------------------------------------------------------------
+
+	public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
+		key("security.kerberos.login.principal")
+			.noDefaultValue()
+			.withDeprecatedKeys("security.principal");
+
+	public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
+		key("security.kerberos.login.keytab")
+			.noDefaultValue()
+			.withDeprecatedKeys("security.keytab");
+
+	public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE =
+		key("security.kerberos.login.use-ticket-cache")
+			.defaultValue(true);
+
+	public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS =
+		key("security.kerberos.login.contexts")
+			.noDefaultValue();
+
+
+	// ------------------------------------------------------------------------
+	//  ZooKeeper Security Options
+	// ------------------------------------------------------------------------
+
+	public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
+		key("zookeeper.sasl.service-name")
+			.defaultValue("zookeeper");
+
+	public static final ConfigOption<String> ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME =
+		key("zookeeper.sasl.login-context-name")
+			.defaultValue("Client");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index c650cfe..f759db6 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -162,21 +162,24 @@ jobmanager.web.port: 8081
 # Flink Cluster Security Configuration (optional configuration)
 #==============================================================================
 
-# Kerberos security for the connectors can be enabled by providing below configurations
-# Security works in two modes - keytab/principal combination or using the Kerberos token cache
-# If keytab and principal are not provided, token cache (manual kinit) will be used
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
 
-#security.keytab: /path/to/kerberos/keytab
-#security.principal: flink-user
+#security.kerberos.login.keytab: /path/to/kerberos/keytab
+#security.kerberos.login.principal: flink-user
+#security.kerberos.login.use-ticket-cache: true
+
+#security.kerberos.login.contexts: Client,KafkaClient
 
 #==============================================================================
 # ZK Security Configuration (optional configuration)
 #==============================================================================
-# Below configurations are applicable if ZK quorum is configured for Kerberos security
 
-# SASL authentication is disabled by default and can be enabled by changig the value to false
-#
-# zookeeper.sasl.disable: true
+# Below configurations are applicable if ZK ensemble is configured for security
 
 # Override below configuration to provide custom ZK service name if configured
 #

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 7c41eaf..da8244f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -21,17 +21,22 @@ package org.apache.flink.api.java.hadoop.mapred.utils;
 
 import java.io.File;
 import java.lang.reflect.Constructor;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +48,8 @@ public final class HadoopUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
 
+	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
 	/**
 	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
 	 */
@@ -163,6 +170,20 @@ public final class HadoopUtils {
 	}
 
 	/**
+	 * Indicates whether the current user has an HDFS delegation token.
+	 */
+	public static boolean hasHDFSDelegationToken() throws Exception {
+		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for (Token<? extends TokenIdentifier> token : usrTok) {
+			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private HadoopUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 4b9bd82..689c26a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -27,7 +27,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -168,7 +167,6 @@ public class MesosApplicationMasterRunner {
 
 			// configure security
 			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
-			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 			SecurityUtils.install(sc);
 
 			// run the actual work in the installed security context

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 75b5043..206c71b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -118,7 +117,6 @@ public class MesosTaskManagerRunner {
 
 		// Run the TM in the security context
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
-		sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 		SecurityUtils.install(sc);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
index 7fe5b3e..271b32d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.overlays;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.slf4j.Logger;
@@ -34,7 +34,7 @@ import java.io.IOException;
  * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
  *
  * The folloowing Flink configuration entries are updated:
- *  - security.keytab
+ *  - security.kerberos.login.keytab
  */
 public class KeytabOverlay extends AbstractContainerOverlay {
 
@@ -60,7 +60,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
 				.setDest(TARGET_PATH)
 				.setCachable(false)
 				.build());
-			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath());
+			container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, TARGET_PATH.getPath());
 		}
 	}
 
@@ -69,7 +69,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
 	}
 
 	/**
-	 * A builder for the {@link HadoopUserOverlay}.
+	 * A builder for the {@link KeytabOverlay}.
 	 */
 	public static class Builder {
 
@@ -79,15 +79,15 @@ public class KeytabOverlay extends AbstractContainerOverlay {
 		 * Configures the overlay using the current environment (and global configuration).
 		 *
 		 * The following Flink configuration settings are checked for a keytab:
-		 *  - security.keytab
+		 *  - security.kerberos.login.keytab
 		 */
 		public Builder fromEnvironment(Configuration globalConfiguration) {
-			String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+			String keytab = globalConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
 			if(keytab != null) {
 				keytabPath = new File(keytab);
 				if(!keytabPath.exists()) {
 					throw new IllegalStateException("Invalid configuration for " +
-						ConfigConstants.SECURITY_KEYTAB_KEY +
+						SecurityOptions.KERBEROS_LOGIN_KEYTAB +
 						"; '" + keytab + "' not found.");
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
new file mode 100644
index 0000000..6af4f23
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A dynamic JAAS configuration.
+ *
+ * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
+ * an (optional) underlying configuration.   Entries from the underlying configuration take
+ * precedence over dynamic entries.
+ */
+public class DynamicConfiguration extends Configuration {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class);
+
+	private final Configuration delegate;
+
+	private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
+
+	/**
+	 * Create a dynamic configuration.
+	 * @param delegate an underlying configuration to delegate to, or null.
+     */
+	public DynamicConfiguration(@Nullable Configuration delegate) {
+		this.delegate = delegate;
+	}
+
+	/**
+	 * Add entries for the given application name.
+     */
+	public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
+		final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+		final AppConfigurationEntry[] updated;
+		if(existing == null) {
+			updated = Arrays.copyOf(entry, entry.length);
+		}
+		else {
+			updated = merge(existing, entry);
+		}
+		dynamicEntries.put(name, updated);
+	}
+
+	/**
+	 * Retrieve the AppConfigurationEntries for the specified <i>name</i>
+	 * from this Configuration.
+	 *
+	 * <p>
+	 *
+	 * @param name the name used to index the Configuration.
+	 *
+	 * @return an array of AppConfigurationEntries for the specified <i>name</i>
+	 *          from this Configuration, or null if there are no entries
+	 *          for the specified <i>name</i>
+	 */
+	@Override
+	public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+		AppConfigurationEntry[] entry = null;
+		if(delegate != null) {
+			entry = delegate.getAppConfigurationEntry(name);
+		}
+		final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+		if(existing != null) {
+			if(entry != null) {
+				entry = merge(entry, existing);
+			}
+			else {
+				entry = Arrays.copyOf(existing, existing.length);
+			}
+		}
+		return entry;
+	}
+
+	private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) {
+		AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length);
+		System.arraycopy(b, 0, merged, a.length, b.length);
+		return merged;
+	}
+
+	@Override
+	public void refresh() {
+		if(delegate != null) {
+			delegate.refresh();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
deleted file mode 100644
index c4527dd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * JAAS configuration provider object that provides default LoginModule for various connectors that supports
- * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class.
- *
- * Different connectors uses different login module name to implement JAAS based authentication support.
- * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the
- * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right
- * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides
- * a standard lookup to get the login module entry for the JAAS based authentication to work.
- *
- * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate.
- *
- * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a>
- *
- */
-
-@Internal
-public class JaasConfiguration extends Configuration {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
-
-	public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
-
-	public static final boolean IBM_JAVA;
-
-	private static final Map<String, String> debugOptions = new HashMap<>();
-
-	private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
-
-	private static final Map<String, String> keytabKerberosOptions = new HashMap<>();
-
-	private static final AppConfigurationEntry userKerberosAce;
-
-	private AppConfigurationEntry keytabKerberosAce = null;
-
-	static {
-
-		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
-
-		if(LOG.isDebugEnabled()) {
-			debugOptions.put("debug", "true");
-		}
-
-		if(IBM_JAVA) {
-			kerberosCacheOptions.put("useDefaultCcache", "true");
-		} else {
-			kerberosCacheOptions.put("doNotPrompt", "true");
-			kerberosCacheOptions.put("useTicketCache", "true");
-		}
-
-		String ticketCache = System.getenv("KRB5CCNAME");
-		if(ticketCache != null) {
-			if(IBM_JAVA) {
-				System.setProperty("KRB5CCNAME", ticketCache);
-			} else {
-				kerberosCacheOptions.put("ticketCache", ticketCache);
-			}
-		}
-
-		kerberosCacheOptions.put("renewTGT", "true");
-		kerberosCacheOptions.putAll(debugOptions);
-
-		userKerberosAce = new AppConfigurationEntry(
-				KerberosUtil.getKrb5LoginModuleName(),
-				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
-				kerberosCacheOptions);
-
-	}
-
-	protected JaasConfiguration(String keytab, String principal) {
-
-		LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal);
-
-		if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
-				(!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){
-			throw new RuntimeException("Both keytab and principal are required and cannot be empty");
-		}
-
-		if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
-
-			if(IBM_JAVA) {
-				keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
-				keytabKerberosOptions.put("credsType", "both");
-			} else {
-				keytabKerberosOptions.put("keyTab", keytab);
-				keytabKerberosOptions.put("doNotPrompt", "true");
-				keytabKerberosOptions.put("useKeyTab", "true");
-				keytabKerberosOptions.put("storeKey", "true");
-			}
-
-			keytabKerberosOptions.put("principal", principal);
-			keytabKerberosOptions.put("refreshKrb5Config", "true");
-			keytabKerberosOptions.putAll(debugOptions);
-
-			keytabKerberosAce = new AppConfigurationEntry(
-					KerberosUtil.getKrb5LoginModuleName(),
-					AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-					keytabKerberosOptions);
-		}
-	}
-
-	public static Map<String, String> getKeytabKerberosOptions() {
-		return keytabKerberosOptions;
-	}
-
-	private static String prependFileUri(String keytabPath) {
-		File f = new File(keytabPath);
-		return f.toURI().toString();
-	}
-
-	@Override
-	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
-		LOG.debug("JAAS configuration requested for the application entry: {}", applicationName);
-
-		AppConfigurationEntry[] appConfigurationEntry;
-
-		if(keytabKerberosAce != null) {
-			appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce};
-		} else {
-			appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce};
-		}
-
-		return appConfigurationEntry;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
new file mode 100644
index 0000000..7ef9187
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ *
+ * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
+ *
+ * The implementation is inspired from Hadoop UGI class.
+ */
+@Internal
+public class KerberosUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+
+	private static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
+
+	private static final boolean IBM_JAVA;
+
+	private static final Map<String, String> debugOptions = new HashMap<>();
+
+	private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
+
+	private static final AppConfigurationEntry userKerberosAce;
+
+	static {
+
+		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+		if(LOG.isDebugEnabled()) {
+			debugOptions.put("debug", "true");
+		}
+
+		if(IBM_JAVA) {
+			kerberosCacheOptions.put("useDefaultCcache", "true");
+		} else {
+			kerberosCacheOptions.put("doNotPrompt", "true");
+			kerberosCacheOptions.put("useTicketCache", "true");
+		}
+
+		String ticketCache = System.getenv("KRB5CCNAME");
+		if(ticketCache != null) {
+			if(IBM_JAVA) {
+				System.setProperty("KRB5CCNAME", ticketCache);
+			} else {
+				kerberosCacheOptions.put("ticketCache", ticketCache);
+			}
+		}
+
+		kerberosCacheOptions.put("renewTGT", "true");
+		kerberosCacheOptions.putAll(debugOptions);
+
+		userKerberosAce = new AppConfigurationEntry(
+				KerberosUtil.getKrb5LoginModuleName(),
+				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+				kerberosCacheOptions);
+
+	}
+
+	public static AppConfigurationEntry ticketCacheEntry() {
+		return userKerberosAce;
+	}
+
+	public static AppConfigurationEntry keytabEntry(String keytab, String principal) {
+
+		checkNotNull(keytab, "keytab");
+		checkNotNull(principal, "principal");
+
+		Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+		if(IBM_JAVA) {
+			keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
+			keytabKerberosOptions.put("credsType", "both");
+		} else {
+			keytabKerberosOptions.put("keyTab", keytab);
+			keytabKerberosOptions.put("doNotPrompt", "true");
+			keytabKerberosOptions.put("useKeyTab", "true");
+			keytabKerberosOptions.put("storeKey", "true");
+		}
+
+		keytabKerberosOptions.put("principal", principal);
+		keytabKerberosOptions.put("refreshKrb5Config", "true");
+		keytabKerberosOptions.putAll(debugOptions);
+
+		AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+				KerberosUtil.getKrb5LoginModuleName(),
+				AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+				keytabKerberosOptions);
+
+		return keytabKerberosAce;
+	}
+
+	private static String prependFileUri(String keytabPath) {
+		File f = new File(keytabPath);
+		return f.toURI().toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d7fc6ff..d76e7a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,216 +18,162 @@
 
 package org.apache.flink.runtime.security;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.runtime.security.modules.ZooKeeperModule;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /*
- * Utils for configuring security. The following security mechanism are supported:
+ * Utils for configuring security. The following security subsystems are supported:
  *
  * 1. Java Authentication and Authorization Service (JAAS)
  * 2. Hadoop's User Group Information (UGI)
+ * 3. ZooKeeper's process-wide security settings.
  */
 public class SecurityUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
 
-	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
-
-	public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
-
-	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
-
-	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
-
 	private static SecurityContext installedContext = new NoOpSecurityContext();
 
+	private static List<SecurityModule> installedModules = null;
+
 	public static SecurityContext getInstalledContext() { return installedContext; }
 
+	@VisibleForTesting
+	static List<SecurityModule> getInstalledModules() {
+		return installedModules;
+	}
+
 	/**
-	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
-	 * It creates the in-memory JAAS configuration object which will serve appropriate
-	 * ApplicationConfigurationEntry for the connector login module implementation that
-	 * authenticates Kerberos identity using SASL/JAAS based mechanism.
+	 * Installs a process-wide security configuration.
+	 *
+	 * Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
 	 */
 	public static void install(SecurityConfiguration config) throws Exception {
 
-		if (!config.securityIsEnabled()) {
-			// do not perform any initialization if no Kerberos crendetails are provided
-			return;
+		// install the security modules
+		List<SecurityModule> modules = new ArrayList<>();
+		try {
+			for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) {
+				SecurityModule module = moduleClass.newInstance();
+				module.install(config);
+				modules.add(module);
+			}
 		}
+		catch(Exception ex) {
+			throw new Exception("unable to establish the security context", ex);
+		}
+		installedModules = modules;
 
-		// establish the JAAS config
-		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
-		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-		populateSystemSecurityProperties(config.flinkConf);
-
-		// establish the UGI login user
-		UserGroupInformation.setConfiguration(config.hadoopConf);
-
-		// only configure Hadoop security if we have security enabled
-		if (UserGroupInformation.isSecurityEnabled()) {
-
-			final UserGroupInformation loginUser;
-
-			if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
-				String keytabPath = (new File(config.keytab)).getAbsolutePath();
-
-				UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-				loginUser = UserGroupInformation.getLoginUser();
-
-				// supplement with any available tokens
-				String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-				if (fileLocation != null) {
-				/*
-				 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
-				 * used in the context of reading the stored tokens from UGI.
-				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-				 * loginUser.addCredentials(cred);
-				*/
-					try {
-						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-							File.class, org.apache.hadoop.conf.Configuration.class);
-						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-							config.hadoopConf);
-						Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
-							Credentials.class);
-						addCredentialsMethod.invoke(loginUser, cred);
-					} catch (NoSuchMethodException e) {
-						LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
-					}
-				}
-			} else {
-				// login with current user credentials (e.g. ticket cache)
+		// install a security context
+		// use the Hadoop login user as the subject of the installed security context
+		if (!(installedContext instanceof NoOpSecurityContext)) {
+			LOG.warn("overriding previous security context");
+		}
+		UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+		installedContext = new HadoopSecurityContext(loginUser);
+	}
+
+	static void uninstall() {
+		if(installedModules != null) {
+			for (SecurityModule module : Lists.reverse(installedModules)) {
 				try {
-					//Use reflection API to get the login user object
-					//UserGroupInformation.loginUserFromSubject(null);
-					Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-					Subject subject = null;
-					loginUserFromSubjectMethod.invoke(null, subject);
-				} catch (NoSuchMethodException e) {
-					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+					module.uninstall();
 				}
-
-				// note that the stored tokens are read automatically
-				loginUser = UserGroupInformation.getLoginUser();
-			}
-
-			LOG.info("Hadoop user set to {}", loginUser.toString());
-
-			boolean delegationToken = false;
-			final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
-			Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-			for (Token<? extends TokenIdentifier> token : usrTok) {
-				final Text id = new Text(token.getIdentifier());
-				LOG.debug("Found user token " + id + " with " + token);
-				if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
-					delegationToken = true;
+				catch(UnsupportedOperationException ignored) {
 				}
-			}
-
-			if (!loginUser.hasKerberosCredentials()) {
-				//throw an error in non-yarn deployment if kerberos cache is not available
-				if (!delegationToken) {
-					LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
-					throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+				catch(SecurityModule.SecurityInstallException e) {
+					LOG.warn("unable to uninstall a security module", e);
 				}
 			}
-
-			if (!(installedContext instanceof NoOpSecurityContext)) {
-				LOG.warn("overriding previous security context");
-			}
-
-			installedContext = new HadoopSecurityContext(loginUser);
+			installedModules = null;
 		}
-	}
 
-	static void clearContext() {
 		installedContext = new NoOpSecurityContext();
 	}
 
-	/*
-	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
-	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
-	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-	 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
-	 * Kafka current code behavior.
+	/**
+	 * The global security configuration.
+	 *
+	 * See {@link SecurityOptions} for corresponding configuration options.
 	 */
-	private static void populateSystemSecurityProperties(Configuration configuration) {
-		Preconditions.checkNotNull(configuration, "The supplied configuration was null");
+	public static class SecurityConfiguration {
 
-		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+		private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
+			Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class));
 
-		if (disableSaslClient) {
-			LOG.info("SASL client auth for ZK will be disabled");
-			//SASL auth is disabled by default but will be enabled if specified in configuration
-			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
-			return;
-		}
+		private final List<Class<? extends SecurityModule>> securityModules;
 
-		// load Jaas config file to initialize SASL
-		final File jaasConfFile;
-		try {
-			Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
-			InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
-			Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
-			jaasConfFile = jaasConfPath.toFile();
-			jaasConfFile.deleteOnExit();
-			jaasConfStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
-				"locate pseudo Jaas config provided with Flink", e);
-		}
+		private final org.apache.hadoop.conf.Configuration hadoopConf;
 
-		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
-				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+		private final boolean useTicketCache;
 
-		//ZK client module lookup the configuration to handle SASL.
-		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
-		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
+		private final String keytab;
 
-		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
-		if (!StringUtils.isBlank(zkSaslServiceName)) {
-			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
-			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
-		}
+		private final String principal;
 
-	}
+		private final List<String> loginContextNames;
 
-	/**
-	 * Inputs for establishing the security context.
-	 */
-	public static class SecurityConfiguration {
+		private final String zkServiceName;
 
-		private Configuration flinkConf;
+		private final String zkLoginContextName;
 
-		private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+		/**
+		 * Create a security configuration from the global configuration.
+		 * @param flinkConf the Flink global configuration.
+         */
+		public SecurityConfiguration(Configuration flinkConf) {
+			this(flinkConf, HadoopUtils.getHadoopConfiguration());
+		}
 
-		private String keytab;
+		/**
+		 * Create a security configuration from the global configuration.
+		 * @param flinkConf the Flink global configuration.
+		 * @param hadoopConf the Hadoop configuration.
+		 */
+		public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
+			this(flinkConf, hadoopConf, DEFAULT_MODULES);
+		}
 
-		private String principal;
+		/**
+		 * Create a security configuration from the global configuration.
+		 * @param flinkConf the Flink global configuration.
+		 * @param hadoopConf the Hadoop configuration.
+		 * @param securityModules the security modules to apply.
+		 */
+		public SecurityConfiguration(Configuration flinkConf,
+				org.apache.hadoop.conf.Configuration hadoopConf,
+				List<? extends Class<? extends SecurityModule>> securityModules) {
+			this.hadoopConf = checkNotNull(hadoopConf);
+			this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
+			this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+			this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+			this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
+			this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+			this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
+			this.securityModules = Collections.unmodifiableList(securityModules);
+
+			validate();
+		}
 
 		public String getKeytab() {
 			return keytab;
@@ -237,48 +183,50 @@ public class SecurityUtils {
 			return principal;
 		}
 
-		public SecurityConfiguration(Configuration flinkConf) {
-			this.flinkConf = flinkConf;
+		public boolean useTicketCache() {
+			return useTicketCache;
+		}
 
-			String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-			String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-			validate(keytab, principal);
+		public org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+			return hadoopConf;
+		}
 
-			this.keytab = keytab;
-			this.principal = principal;
+		public List<Class<? extends SecurityModule>> getSecurityModules() {
+			return securityModules;
 		}
 
-		public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
-			this.hadoopConf = conf;
-			return this;
+		public List<String> getLoginContextNames() {
+			return loginContextNames;
 		}
 
-		private void validate(String keytab, String principal) {
-			LOG.debug("keytab {} and principal {} .", keytab, principal);
+		public String getZooKeeperServiceName() {
+			return zkServiceName;
+		}
 
-			if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
-					!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
-				if(StringUtils.isBlank(keytab)) {
-					LOG.warn("Keytab is null or empty");
-				}
+		public String getZooKeeperLoginContextName() {
+			return zkLoginContextName;
+		}
+
+		private void validate() {
+			if(!StringUtils.isBlank(keytab)) {
+				// principal is required
 				if(StringUtils.isBlank(principal)) {
-					LOG.warn("Principal is null or empty");
+					throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
 				}
-				throw new RuntimeException("Requires both keytab and principal to be provided");
-			}
 
-			if(!StringUtils.isBlank(keytab)) {
+				// check the keytab is readable
 				File keytabFile = new File(keytab);
-				if(!keytabFile.exists() || !keytabFile.isFile()) {
-					LOG.warn("Not a valid keytab: {} file", keytab);
-					throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+				if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
+					throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
 				}
 			}
-
 		}
 
-		public boolean securityIsEnabled() {
-			return keytab != null && principal != null;
+		private static List<String> parseList(String value) {
+			if(value == null) {
+				return Collections.emptyList();
+			}
+			return Arrays.asList(value.split(","));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
new file mode 100644
index 0000000..9344faf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Responsible for installing a Hadoop login user.
+ */
+public class HadoopModule implements SecurityModule {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
+
+	UserGroupInformation loginUser;
+
+	@Override
+	public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+		UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+
+		try {
+			if (UserGroupInformation.isSecurityEnabled() &&
+				!StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
+				String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
+
+				UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
+
+				loginUser = UserGroupInformation.getLoginUser();
+
+				// supplement with any available tokens
+				String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+				if (fileLocation != null) {
+					/*
+					 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+					 * used in the context of reading the stored tokens from UGI.
+					 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+					 * loginUser.addCredentials(cred);
+					*/
+					try {
+						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+							File.class, org.apache.hadoop.conf.Configuration.class);
+						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+							securityConfig.getHadoopConfiguration());
+						Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+							Credentials.class);
+						addCredentialsMethod.invoke(loginUser, cred);
+					} catch (NoSuchMethodException e) {
+						LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+					} catch (InvocationTargetException e) {
+						throw e.getTargetException();
+					}
+				}
+			} else {
+				// login with current user credentials (e.g. ticket cache, OS login)
+				// note that the stored tokens are read automatically
+				try {
+					//Use reflection API to get the login user object
+					//UserGroupInformation.loginUserFromSubject(null);
+					Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+					loginUserFromSubjectMethod.invoke(null, (Subject) null);
+				} catch (NoSuchMethodException e) {
+					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+				} catch (InvocationTargetException e) {
+					throw e.getTargetException();
+				}
+
+				loginUser = UserGroupInformation.getLoginUser();
+			}
+
+			if (UserGroupInformation.isSecurityEnabled()) {
+				// note: UGI::hasKerberosCredentials inaccurately reports false
+				// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+				// so we check only in ticket cache scenario.
+				if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) {
+					// a delegation token is an adequate substitute in most cases
+					if (!HadoopUtils.hasHDFSDelegationToken()) {
+						LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials");
+					}
+				}
+			}
+
+			LOG.info("Hadoop user set to {}", loginUser);
+
+		} catch (Throwable ex) {
+			throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
+		}
+	}
+
+	@Override
+	public void uninstall() throws SecurityInstallException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
new file mode 100644
index 0000000..f8b9bdf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * <p>
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * <p>
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka.  Note that the JRE actually draws on numerous file locations.
+ * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);
+
+	static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+	static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+	private String priorConfigFile;
+	private javax.security.auth.login.Configuration priorConfig;
+
+	private DynamicConfiguration currentConfig;
+
+	@Override
+	public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+		// ensure that a config file is always defined, for compatibility with
+		// ZK and Kafka which check for the system property and existence of the file
+		priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+		if (priorConfigFile == null) {
+			File configFile = generateDefaultConfigFile();
+			System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
+		}
+
+		// read the JAAS configuration file
+		priorConfig = javax.security.auth.login.Configuration.getConfiguration();
+
+		// construct a dynamic JAAS configuration
+		currentConfig = new DynamicConfiguration(priorConfig);
+
+		// wire up the configured JAAS login contexts to use the krb5 entries
+		AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
+		if(krb5Entries != null) {
+			for (String app : securityConfig.getLoginContextNames()) {
+				currentConfig.addAppConfigurationEntry(app, krb5Entries);
+			}
+		}
+
+		javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+	}
+
+	@Override
+	public void uninstall() throws SecurityInstallException {
+		if(priorConfigFile != null) {
+			System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
+		} else {
+			System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
+		}
+		javax.security.auth.login.Configuration.setConfiguration(priorConfig);
+	}
+
+	public DynamicConfiguration getCurrentConfiguration() {
+		return currentConfig;
+	}
+
+	private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
+
+		AppConfigurationEntry userKerberosAce = null;
+		if (securityConfig.useTicketCache()) {
+			userKerberosAce = KerberosUtils.ticketCacheEntry();
+		}
+		AppConfigurationEntry keytabKerberosAce = null;
+		if (securityConfig.getKeytab() != null) {
+			keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal());
+		}
+
+		AppConfigurationEntry[] appConfigurationEntry;
+		if (userKerberosAce != null && keytabKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce, userKerberosAce};
+		} else if (keytabKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce};
+		} else if (userKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[]{userKerberosAce};
+		} else {
+			return null;
+		}
+
+		return appConfigurationEntry;
+	}
+
+	/**
+	 * Generate the default JAAS config file.
+	 */
+	private static File generateDefaultConfigFile() {
+		final File jaasConfFile;
+		try {
+			Path jaasConfPath = Files.createTempFile("jaas-", ".conf");
+			try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) {
+				Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+			}
+			jaasConfFile = jaasConfPath.toFile();
+			jaasConfFile.deleteOnExit();
+		} catch (IOException e) {
+			throw new RuntimeException("unable to generate a JAAS configuration file", e);
+		}
+		return jaasConfFile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
new file mode 100644
index 0000000..fbe1db9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+import java.security.GeneralSecurityException;
+
+/**
+ * An installable security module.
+ */
+public interface SecurityModule {
+
+	/**
+	 * Install the security module.
+	 *
+	 * @param configuration the security configuration.
+	 * @throws SecurityInstallException if the security module couldn't be installed.
+	 */
+	void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException;
+
+	/**
+	 * Uninstall the security module.
+	 *
+	 * @throws SecurityInstallException if the security module couldn't be uninstalled.
+	 * @throws UnsupportedOperationException if the security module doesn't support uninstallation.
+	 */
+	void uninstall() throws SecurityInstallException;
+
+	/**
+	 * Indicates a problem with installing or uninstalling a security module.
+	 */
+	class SecurityInstallException extends GeneralSecurityException {
+		private static final long serialVersionUID = 1L;
+
+		public SecurityInstallException(String msg) {
+			super(msg);
+		}
+
+		public SecurityInstallException(String msg, Throwable cause) {
+			super(msg, cause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
new file mode 100644
index 0000000..c0ba4a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * Responsible for installing a process-wide ZooKeeper security configuration.
+ */
+public class ZooKeeperModule implements SecurityModule {
+
+	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	/**
+	 * A system property for setting whether ZK uses SASL.
+	 */
+	private static final String ZK_ENABLE_CLIENT_SASL = "zookeeper.sasl.client";
+
+	/**
+	 * A system property for setting the expected ZooKeeper service name.
+	 */
+	private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	/**
+	 * A system property for setting the login context name to use.
+	 */
+	private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
+
+	private String priorServiceName;
+
+	private String priorLoginContextName;
+
+	@Override
+	public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+
+		priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
+		if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
+			System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
+		}
+
+		priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
+		if (!"Client".equals(configuration.getZooKeeperLoginContextName())) {
+			System.setProperty(ZK_LOGIN_CONTEXT_NAME, configuration.getZooKeeperLoginContextName());
+		}
+	}
+
+	@Override
+	public void uninstall() throws SecurityInstallException {
+		if(priorServiceName != null) {
+			System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
+		} else {
+			System.clearProperty(ZK_SASL_CLIENT_USERNAME);
+		}
+		if(priorLoginContextName != null) {
+			System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
+		} else {
+			System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
+		}
+	}
+
+}