You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/11 18:06:52 UTC

[1/5] flink git commit: [FLINK-5364] [security] Fix documentation setup for Kerberos

Repository: flink
Updated Branches:
  refs/heads/release-1.2 8d3ad4515 -> 699f4b05b


[FLINK-5364] [security] Fix documentation setup for Kerberos


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/699f4b05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/699f4b05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/699f4b05

Branch: refs/heads/release-1.2
Commit: 699f4b05b36de49b6892f0cc1222a5a59179b407
Parents: 00193f7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 11 14:32:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100

----------------------------------------------------------------------
 docs/internals/flink_security.md | 146 ----------------------------------
 docs/ops/security-kerberos.md    | 145 +++++++++++++++++++++++++++++++++
 2 files changed, 145 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/699f4b05/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
deleted file mode 100644
index a83f3b9..0000000
--- a/docs/internals/flink_security.md
+++ /dev/null
@@ -1,146 +0,0 @@
----
-title:  "Flink Security"
-# Top navigation
-top-nav-group: internals
-top-nav-pos: 10
-top-nav-title: Flink Security
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), 
-filesystems, connectors, and state backends.
-
-## Objective
-The primary goals of the Flink Kerberos security infrastructure are:
-1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
-2. to authenticate to ZooKeeper (if configured to use SASL)
-3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
-
-In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure 
-data sources throughout the life of the job.  Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
-or ticket cache entry.
-
-The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
-or with Hadoop delegation tokens.   Keep in mind that all jobs share the credential configured for a given cluster.   To use a different keytab
-for for a certain job, simply launch a separate Flink cluster with a different configuration.   Numerous Flink clusters may run side-by-side in a YARN
-or Mesos environment.
-
-## How Flink Security works
-In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While satisfying the security requirements for all connectors is an ongoing effort,
-Flink provides first-class support for Kerberos authentication only.  The following services and connectors are tested for Kerberos authentication:
-
-- Kafka (0.9+)
-- HDFS
-- HBase
-- ZooKeeper
-
-Note that it is possible to enable the use of Kerberos independently for each service or connector.  For example, the user may enable 
-Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.    The shared element is the configuration of 
-Kerbreros credentials, which is then explicitly used by each component.
-
-The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
-are installed at startup.  The next section describes each security module.
-
-### Hadoop Security Module
-This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context.   The login user is
-then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
-
-If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured.  Otherwise,
-the login user conveys only the user identity of the OS account that launched the cluster.
-
-### JAAS Security Module
-This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
-Kafka, and other such components that rely on JAAS.
-
-Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html).   Static entries override any
-dynamic entries provided by this module.
-
-### ZooKeeper Security Module
-This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
-and the JAAS login context name (default: `Client`).
-
-## Security Configuration
-
-### Flink Configuration
-The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
-
-- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
-
-A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
-
-- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-
-- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-
-These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context.  Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section).   To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
-
-- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
-
-ZooKeeper-related configuration overrides:
-
-- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
-
-- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
-one of the values specified in `security.kerberos.login.contexts`.
-
-### Hadoop Configuration
-
-The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`).   The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
-
-Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts.   In this scenario, the Flink CLI acquires Hadoop
-delegation tokens (for HDFS and for HBase).
-
-## Deployment Modes
-Here is some information specific to each deployment mode.
-
-### Standalone Mode
-
-Steps to run a secure Flink cluster in standalone/cluster mode:
-1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
-2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
-3. Deploy Flink cluster as normal.
-
-### YARN/Mesos Mode
-
-Steps to run a secure Flink cluster in YARN/Mesos mode:
-1. Add security-related configuration options to the Flink configuration file on the client.
-2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
-3. Deploy Flink cluster as normal.
-
-In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
-
-For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
-
-#### Using `kinit` (YARN only)
-
-In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
-This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it.  The main drawback is
-that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
-
-Steps to run a secure Flink cluster using `kinit`:
-1. Add security-related configuration options to the Flink configuration file on the client.
-2. Login using the `kinit` command.
-3. Deploy Flink cluster as normal.
-
-## Further Details
-### Ticket Renewal
-Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
-Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab.  In the delegation token scenario,
-YARN itself renews the token (up to its maximum lifespan).

http://git-wip-us.apache.org/repos/asf/flink/blob/699f4b05/docs/ops/security-kerberos.md
----------------------------------------------------------------------
diff --git a/docs/ops/security-kerberos.md b/docs/ops/security-kerberos.md
new file mode 100644
index 0000000..2afe760
--- /dev/null
+++ b/docs/ops/security-kerberos.md
@@ -0,0 +1,145 @@
+---
+title:  "Kerberos Authentication Setup and Configuration"
+nav-parent_id: setup
+nav-pos: 10
+nav-title: Kerberos
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
+
+## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
+
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure 
+data sources throughout the life of the job.  Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the credential configured for a given cluster.   To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration.   Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
+
+## How Flink Security works
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only.  The following services and connectors are tested for Kerberos authentication:
+
+- Kafka (0.9+)
+- HDFS
+- HBase
+- ZooKeeper
+
+Note that it is possible to enable the use of Kerberos independently for each service or connector.  For example, the user may enable 
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.    The shared element is the configuration of 
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup.  The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context.   The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured.  Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html).   Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
+
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
+
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
+
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context.  Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section).   To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
+
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
+
+ZooKeeper-related configuration overrides:
+
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`).   The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts.   In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
+
+Steps to run a secure Flink cluster in standalone/cluster mode:
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
+
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
+
+#### Using `kinit` (YARN only)
+
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it.  The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
+
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
+
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab.  In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).


[3/5] flink git commit: [FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf
index 7f0f06b..d287ff4 100644
--- a/flink-runtime/src/main/resources/flink-jaas.conf
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -1,3 +1,4 @@
+/**
 ################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -17,10 +18,6 @@
 ################################################################################
 # We are using this file as an workaround for the Kafka and ZK SASL implementation
 # since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal implementation
-# uses a process-wide in-memory java security configuration object.
 # Please do not edit/delete this file - See FLINK-3929
-sample {
-  useKeyTab=false
-  useTicketCache=true;
-}
+**/
+

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
index 0570f28..1847ec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.overlays;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.junit.Rule;
@@ -46,7 +47,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase {
 		ContainerSpecification spec = new ContainerSpecification();
 		overlay.configure(spec);
 
-		assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null));
+		assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
 		checkArtifact(spec, TARGET_PATH);
 	}
 
@@ -64,7 +65,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase {
 		final Configuration conf = new Configuration();
 		File keytab = tempFolder.newFile();
 
-		conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath());
+		conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytab.getAbsolutePath());
 		KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf);
 		assertEquals(builder.keytabPath, keytab);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
deleted file mode 100644
index 89e5ef9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.junit.Test;
-
-import javax.security.auth.login.AppConfigurationEntry;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link JaasConfiguration}.
- */
-public class JaasConfigurationTest {
-
-	@Test
-	public void testInvalidKerberosParams() {
-		String keytab = "user.keytab";
-		String principal = null;
-		try {
-			new JaasConfiguration(keytab, principal);
-		} catch(RuntimeException re) {
-			assertEquals("Both keytab and principal are required and cannot be empty",re.getMessage());
-		}
-	}
-
-	@Test
-	public void testDefaultAceEntry() {
-		JaasConfiguration conf = new JaasConfiguration(null,null);
-		javax.security.auth.login.Configuration.setConfiguration(conf);
-		final AppConfigurationEntry[] entry = conf.getAppConfigurationEntry("test");
-		AppConfigurationEntry ace = entry[0];
-		assertEquals(ace.getLoginModuleName(), KerberosUtil.getKrb5LoginModuleName());
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
new file mode 100644
index 0000000..4c899e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the {@link KerberosUtils}.
+ */
+public class KerberosUtilsTest {
+
+	@Test
+	public void testTicketCacheEntry() {
+		AppConfigurationEntry entry = KerberosUtils.ticketCacheEntry();
+		assertNotNull(entry);
+	}
+
+	@Test
+	public void testKeytabEntry() {
+		String keytab = "user.keytab";
+		String principal = "user";
+		AppConfigurationEntry entry = KerberosUtils.keytabEntry(keytab, principal);
+		assertNotNull(entry);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 2648a7a..c5624f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -18,97 +18,64 @@
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.flink.runtime.security.modules.SecurityModule;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.lang.reflect.Method;
+import java.util.Collections;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Tests for the {@link SecurityUtils}.
  */
 public class SecurityUtilsTest {
 
-	@AfterClass
-	public static void afterClass() {
-		SecurityUtils.clearContext();
-		System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
-	}
+	static class TestSecurityModule implements SecurityModule {
+		boolean installed;
 
-	@Test
-	public void testCreateInsecureHadoopCtx() {
-		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
-		try {
-			SecurityUtils.install(sc);
-			assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
-		} catch (Exception e) {
-			fail(e.getMessage());
+		@Override
+		public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+			installed = true;
 		}
-	}
 
-	@Test
-	public void testInvalidUGIContext() {
-		try {
-			new HadoopSecurityContext(null);
-		} catch (RuntimeException re) {
-			assertEquals("UGI passed cannot be null",re.getMessage());
+		@Override
+		public void uninstall() throws SecurityInstallException {
+			installed = false;
 		}
 	}
 
+	@AfterClass
+	public static void afterClass() {
+		SecurityUtils.uninstall();
+	}
+
 	@Test
-	/**
-	 * The Jaas configuration file provided should not be overridden.
-	 */
-	public void testJaasPropertyOverride() throws Exception {
-		String confFile = "jaas.conf";
-		System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+	public void testModuleInstall() throws Exception {
+		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
+			new Configuration(), new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
 
-		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration()));
+		SecurityUtils.install(sc);
+		assertEquals(1, SecurityUtils.getInstalledModules().size());
+		TestSecurityModule testModule = (TestSecurityModule) SecurityUtils.getInstalledModules().get(0);
+		assertTrue(testModule.installed);
 
-		Assert.assertEquals(
-			confFile,
-			System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+		SecurityUtils.uninstall();
+		assertNull(SecurityUtils.getInstalledModules());
+		assertFalse(testModule.installed);
 	}
 
+	@Test
+	public void testSecurityContext() throws Exception {
+		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
+			new Configuration(), new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
 
-	private String getOSUserName() throws Exception {
-		String userName = "";
-		OperatingSystem os = OperatingSystem.getCurrentOperatingSystem();
-		String className;
-		String methodName;
-
-		switch(os) {
-			case LINUX:
-			case MAC_OS:
-				className = "com.sun.security.auth.module.UnixSystem";
-				methodName = "getUsername";
-				break;
-			case WINDOWS:
-				className = "com.sun.security.auth.module.NTSystem";
-				methodName = "getName";
-				break;
-			case SOLARIS:
-				className = "com.sun.security.auth.module.SolarisSystem";
-				methodName = "getUsername";
-				break;
-			case FREE_BSD:
-			case UNKNOWN:
-			default:
-				className = null;
-				methodName = null;
-		}
+		SecurityUtils.install(sc);
+		assertEquals(HadoopSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
 
-		if( className != null ){
-			Class<?> c = Class.forName( className );
-			Method method = c.getDeclaredMethod( methodName );
-			Object o = c.newInstance();
-			userName = (String) method.invoke( o );
-		}
-		return userName;
+		SecurityUtils.uninstall();
+		assertEquals(NoOpSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index de715c6..10450c3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -30,10 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
-import java.io.FileWriter;
-import java.io.BufferedWriter;
-import java.io.PrintWriter;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -111,11 +107,11 @@ public class SecureTestEnvironment {
 			//the context can be reinitialized with Hadoop configuration by calling
 			//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
 			//See Yarn test case module for reference
-			createJaasConfig(baseDirForSecureRun);
 			Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
-			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
-			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
-			flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
+			flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
+			flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
+			flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);
+			flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Client,KafkaClient");
 			SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
 			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
 
@@ -178,8 +174,8 @@ public class SecureTestEnvironment {
 			conf = flinkConf;
 		}
 
-		conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab);
-		conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal);
+		conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , testKeytab);
+		conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , testPrincipal);
 
 		return conf;
 	}
@@ -190,22 +186,19 @@ public class SecureTestEnvironment {
 
 		if(testZkServerPrincipal != null ) {
 			TestingSecurityContext.ClientSecurityConfiguration zkServer =
-					new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab,
-							"Server", "zk-server");
+					new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab);
 			clientSecurityConfigurationMap.put("Server",zkServer);
 		}
 
 		if(testZkClientPrincipal != null ) {
 			TestingSecurityContext.ClientSecurityConfiguration zkClient =
-					new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab,
-							"Client", "zk-client");
+					new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab);
 			clientSecurityConfigurationMap.put("Client",zkClient);
 		}
 
 		if(testKafkaServerPrincipal != null ) {
 			TestingSecurityContext.ClientSecurityConfiguration kafkaServer =
-					new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab,
-							"KafkaServer", "kafka-server");
+					new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab);
 			clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
 		}
 
@@ -220,23 +213,4 @@ public class SecureTestEnvironment {
 		return hadoopServicePrincipal;
 	}
 
-	/*
-	 * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL
-	 * implementation lookup java.security.auth.login.config
-	 */
-	private static void  createJaasConfig(File baseDirForSecureRun) {
-
-		try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun, SecurityUtils.JAAS_CONF_FILENAME), true);
-			BufferedWriter bw = new BufferedWriter(fw);
-			PrintWriter out = new PrintWriter(bw))
-		{
-			out.println("sample {");
-			out.println("useKeyTab=false");
-			out.println("useTicketCache=true;");
-			out.println("};");
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while trying to create JAAS config.", e);
-		}
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
deleted file mode 100644
index 25b2362..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.JaasConfiguration;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage
- * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated
- * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module
- * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier
- */
-
-@Internal
-public class TestingJaasConfiguration extends JaasConfiguration {
-
-	private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class);
-
-	public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap;
-
-	TestingJaasConfiguration(String keytab, String principal, Map<String,
-			TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) {
-		super(keytab, principal);
-		this.clientSecurityConfigurationMap = clientSecurityConfigurationMap;
-	}
-
-	@Override
-	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
-		LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName);
-
-		AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName);
-
-		if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) {
-
-			if(clientSecurityConfigurationMap.containsKey(applicationName)) {
-
-				LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName);
-
-				TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName);
-
-				if(appConfigurationEntry != null && appConfigurationEntry.length > 0) {
-
-					for(int count=0; count < appConfigurationEntry.length; count++) {
-
-						AppConfigurationEntry ace = appConfigurationEntry[count];
-
-						if (ace.getOptions().containsKey("keyTab")) {
-
-							String keyTab = conf.getKeytab();
-							String principal = conf.getPrincipal();
-
-							LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " +
-									"use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal);
-
-							Map<String, String> newKeytabKerberosOptions = new HashMap<>();
-							newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
-
-							newKeytabKerberosOptions.put("keyTab", keyTab);
-							newKeytabKerberosOptions.put("principal", principal);
-
-							AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
-									KerberosUtil.getKrb5LoginModuleName(),
-									AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-									newKeytabKerberosOptions);
-							appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce};
-
-							LOG.debug("---->Login Module is using Keytab based configuration<------");
-							LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName());
-							LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag());
-							LOG.debug("Options: " + keytabKerberosAce.getOptions());
-						}
-					}
-				}
-			}
-
-		}
-
-		return appConfigurationEntry;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
index 4343013..ff1810b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -19,10 +19,16 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.JaasModule;
 
+import javax.security.auth.login.AppConfigurationEntry;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /*
  * Test security context to support handling both client and server principals in MiniKDC
  * This class is used only in integration test code for connectors like Kafka, HDFS etc.,
@@ -36,21 +42,20 @@ public class TestingSecurityContext {
 
 		SecurityUtils.install(config);
 
-		// establish the JAAS config for Test environment
-		TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
-				config.getPrincipal(), clientSecurityConfigurationMap);
-		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+		// install dynamic JAAS entries
+		checkArgument(config.getSecurityModules().contains(JaasModule.class));
+		DynamicConfiguration jaasConf = (DynamicConfiguration) javax.security.auth.login.Configuration.getConfiguration();
+		for(Map.Entry<String,ClientSecurityConfiguration> e : clientSecurityConfigurationMap.entrySet()) {
+			AppConfigurationEntry entry = KerberosUtils.keytabEntry(e.getValue().getKeytab(), e.getValue().getPrincipal());
+			jaasConf.addAppConfigurationEntry(e.getKey(), entry);
+		}
 	}
 
 	public static class ClientSecurityConfiguration {
 
-		private String principal;
-
-		private String keytab;
+		private final String principal;
 
-		private String moduleName;
-
-		private String jaasServiceName;
+		private final String keytab;
 
 		public String getPrincipal() {
 			return principal;
@@ -60,21 +65,10 @@ public class TestingSecurityContext {
 			return keytab;
 		}
 
-		public String getModuleName() {
-			return moduleName;
-		}
-
-		public String getJaasServiceName() {
-			return jaasServiceName;
-		}
-
-		public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) {
+		public ClientSecurityConfiguration(String principal, String keytab) {
 			this.principal = principal;
 			this.keytab = keytab;
-			this.moduleName = moduleName;
-			this.jaasServiceName = jaasServiceName;
 		}
-
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 45fd8d0..d3558a9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -53,13 +53,13 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 				SecureTestEnvironment.getTestKeytab());
 
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
 				SecureTestEnvironment.getTestKeytab());
-		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
-		ctx.setHadoopConfiguration(yarnConfiguration);
+		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig,
+				yarnConfiguration);
 		try {
 			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index dc7cca3..ca8a0da 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -424,8 +425,8 @@ public abstract class YarnTestBase extends TestLogger {
 					LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file");
 					out.println("");
 					out.println("#Security Configurations Auto Populated ");
-					out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
-					out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+					out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab);
+					out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal);
 					out.println("");
 				} catch (IOException e) {
 					throw new RuntimeException("Exception occured while trying to append the security configurations.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ca18439..70f3222 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,11 +22,11 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -404,6 +404,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient deploy() {
 		try {
+			if(UserGroupInformation.isSecurityEnabled()) {
+				// note: UGI::hasKerberosCredentials inaccurately reports false
+				// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+				// so we check only in ticket cache scenario.
+				boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+				UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+				if (useTicketCache && !loginUser.hasKerberosCredentials()) {
+					LOG.error("Hadoop security is enabled but the login user does not have Kerberos credentials");
+					throw new RuntimeException("Hadoop security is enabled but the login user " +
+							"does not have Kerberos credentials");
+				}
+			}
 			return deployInternal();
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't deploy Yarn cluster", e);
@@ -583,12 +595,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		//check if there is a JAAS config file
-		File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityUtils.JAAS_CONF_FILENAME);
-		if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
-			effectiveShipFiles.add(jaasConfigFile);
-		}
-
 		addLibFolderToShipFiles(effectiveShipFiles);
 
 		// add the user jar to the classpath of the to-be-created cluster
@@ -730,7 +736,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// setup security tokens
 		LocalResource keytabResource = null;
 		Path remotePathKeytab = null;
-		String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+		String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
 		if(keytab != null) {
 			LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
 			keytabResource = Records.newRecord(LocalResource.class);
@@ -773,7 +779,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		if(keytabResource != null) {
 			appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
-			String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+			String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
 			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1826d43..e4027d4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -169,21 +170,27 @@ public class YarnApplicationMasterRunner {
 
 			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
 			if(keytabPath != null && remoteKeytabPrincipal != null) {
-				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
-			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
+			org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
 			if(krb5Conf.exists() && krb5Conf.canRead()) {
 				String krb5Path = krb5Conf.getAbsolutePath();
 				LOG.info("KRB5 Conf: {}", krb5Path);
-				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-				sc.setHadoopConfiguration(conf);
+				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+			}
+
+			SecurityUtils.SecurityConfiguration sc;
+			if(hadoopConfiguration != null) {
+				sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
+			} else {
+				sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
 			}
 
 			SecurityUtils.install(sc);
@@ -256,8 +263,8 @@ public class YarnApplicationMasterRunner {
 				LOG.info("keytabPath: {}", keytabPath);
 			}
 			if(keytabPath != null && remoteKeytabPrincipal != null) {
-				config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-				config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+				config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+				config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
 			// Hadoop/Yarn configuration (loads config data automatically from classpath files)

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 015eb1b..059f1aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -111,22 +112,28 @@ public class YarnTaskManagerRunner {
 
 		try {
 
-			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+			org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
 			if(krb5Conf.exists() && krb5Conf.canRead()) {
 				String krb5Path = krb5Conf.getAbsolutePath();
 				LOG.info("KRB5 Conf: {}", krb5Path);
-				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-				sc.setHadoopConfiguration(conf);
+				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+			}
+
+			SecurityUtils.SecurityConfiguration sc;
+			if(hadoopConfiguration != null) {
+				sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+			} else {
+				sc = new SecurityUtils.SecurityConfiguration(configuration);
 			}
 
 			if(keytabPath != null && remoteKeytabPrincipal != null) {
-				configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+				configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+				configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
 			SecurityUtils.install(sc);


[5/5] flink git commit: [FLINK-5442] [streaming] Ensure order of enum elements in StateDescriptor.Type through a test

Posted by se...@apache.org.
[FLINK-5442] [streaming] Ensure order of enum elements in StateDescriptor.Type through a test

This closes #3091


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/758ea79a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/758ea79a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/758ea79a

Branch: refs/heads/release-1.2
Commit: 758ea79a1cda1dec58d43266487ce663f7205f86
Parents: 8d3ad45
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jan 10 22:52:58 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java     |  3 ++-
 .../runtime/state/SerializationProxiesTest.java     | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/758ea79a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index de3cd4e..ad9d417 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -49,8 +49,9 @@ import static java.util.Objects.requireNonNull;
 @PublicEvolving
 public abstract class StateDescriptor<S extends State, T> implements Serializable {
 
+	// Do not change the order of the elements in this enum, ordinal is used in serialization
 	public enum Type {
-		VALUE, LIST, REDUCING, FOLDING, @Deprecated UNKNOWN
+		@Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING
 	}
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/758ea79a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 9211e92..832b022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -96,4 +96,20 @@ public class SerializationProxiesTest {
 
 		Assert.assertEquals(name, metaInfo.getStateName());
 	}
+
+	/**
+	 * This test fixes the order of elements in the enum which is important for serialization. Do not modify this test
+	 * except if you are entirely sure what you are doing.
+	 */
+	@Test
+	public void testFixTypeOrder() {
+		// ensure all elements are covered
+		Assert.assertEquals(5, StateDescriptor.Type.values().length);
+		// fix the order of elements to keep serialization format stable
+		Assert.assertEquals(0, StateDescriptor.Type.UNKNOWN.ordinal());
+		Assert.assertEquals(1, StateDescriptor.Type.VALUE.ordinal());
+		Assert.assertEquals(2, StateDescriptor.Type.LIST.ordinal());
+		Assert.assertEquals(3, StateDescriptor.Type.REDUCING.ordinal());
+		Assert.assertEquals(4, StateDescriptor.Type.FOLDING.ordinal());
+	}
 }
\ No newline at end of file


[2/5] flink git commit: [FLINK-5427] [docs] Fix code example in event_timestamps_watermarks.md

Posted by se...@apache.org.
[FLINK-5427] [docs] Fix code example in event_timestamps_watermarks.md

This closes #3082


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1750b0dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1750b0dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1750b0dd

Branch: refs/heads/release-1.2
Commit: 1750b0dda8c243a215dcd558baf36ea1053a4310
Parents: 758ea79
Author: fengyelei 00406569 <fe...@huawei.com>
Authored: Mon Jan 9 18:37:59 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100

----------------------------------------------------------------------
 docs/dev/event_timestamps_watermarks.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1750b0dd/docs/dev/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md
index 8d152df..a16aa55 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -304,7 +304,7 @@ public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent
 
 	@Override
 	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
-		return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
+		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
 	}
 }
 {% endhighlight %}
@@ -318,7 +318,7 @@ class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
 	}
 
 	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
-		if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
+		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
 	}
 }
 {% endhighlight %}


[4/5] flink git commit: [FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries

Posted by se...@apache.org.
[FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries

Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055

This closes #3057


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00193f7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00193f7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00193f7e

Branch: refs/heads/release-1.2
Commit: 00193f7e238340cc18c57a44c7e6377432839373
Parents: 1750b0d
Author: wrighe3 <er...@emc.com>
Authored: Tue Dec 20 01:07:38 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100

----------------------------------------------------------------------
 docs/internals/flink_security.md                | 133 +++++---
 docs/setup/config.md                            |  55 +++-
 .../connectors/fs/RollingSinkSecuredITCase.java |   8 +-
 .../flink/configuration/ConfigConstants.java    |  14 -
 .../configuration/HighAvailabilityOptions.java  |   8 -
 .../flink/configuration/SecurityOptions.java    |  62 ++++
 flink-dist/src/main/resources/flink-conf.yaml   |  21 +-
 .../java/hadoop/mapred/utils/HadoopUtils.java   |  21 ++
 .../MesosApplicationMasterRunner.java           |   2 -
 .../MesosTaskManagerRunner.java                 |   2 -
 .../overlays/KeytabOverlay.java                 |  14 +-
 .../runtime/security/DynamicConfiguration.java  | 111 +++++++
 .../runtime/security/JaasConfiguration.java     | 160 ---------
 .../flink/runtime/security/KerberosUtils.java   | 125 +++++++
 .../flink/runtime/security/SecurityUtils.java   | 322 ++++++++-----------
 .../runtime/security/modules/HadoopModule.java  | 119 +++++++
 .../runtime/security/modules/JaasModule.java    | 146 +++++++++
 .../security/modules/SecurityModule.java        |  59 ++++
 .../security/modules/ZooKeeperModule.java       |  76 +++++
 .../src/main/resources/flink-jaas.conf          |   9 +-
 .../overlays/KeytabOverlayTest.java             |   5 +-
 .../runtime/security/JaasConfigurationTest.java |  52 ---
 .../runtime/security/KerberosUtilsTest.java     |  48 +++
 .../runtime/security/SecurityUtilsTest.java     | 105 +++---
 .../flink/test/util/SecureTestEnvironment.java  |  48 +--
 .../test/util/TestingJaasConfiguration.java     | 106 ------
 .../flink/test/util/TestingSecurityContext.java |  38 +--
 .../yarn/YARNSessionFIFOSecuredITCase.java      |  10 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  26 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  25 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  21 +-
 32 files changed, 1182 insertions(+), 774 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
index 846273b..a83f3b9 100644
--- a/docs/internals/flink_security.md
+++ b/docs/internals/flink_security.md
@@ -24,64 +24,123 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) 
-and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers 
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
 
 ## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
 
-The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, 
-streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be  able to authenticate against secure 
-data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
-context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure 
+data sources throughout the life of the job.  Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the credential configured for a given cluster.   To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration.   Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
 
 ## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security 
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period 
-of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only.  The following services and connectors are tested for Kerberos authentication:
 
-- Kafka (0.9)
+- Kafka (0.9+)
 - HDFS
+- HBase
 - ZooKeeper
 
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for each service or connector.  For example, the user may enable 
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.    The shared element is the configuration of 
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup.  The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context.   The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured.  Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html).   Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
 
-Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login 
-module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is 
-instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
 
-It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled 
-then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
 
-## Security Configurations
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
 
-Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
 
-- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context.  Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section).   To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
 
-- `security.principal`: User principal name that the Flink cluster should run as.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
 
-The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+ZooKeeper-related configuration overrides:
 
-## Standalone Mode:
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`).   The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts.   In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
 
 Steps to run a secure Flink cluster in standalone/cluster mode:
-- Add security configurations to Flink configuration file (on all cluster nodes) 
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
-- Deploy Flink cluster using cluster start/stop scripts or CLI
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
 
-## Yarn Mode:
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
 
-Steps to run secure Flink cluster in Yarn mode:
-- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI) 
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
-- Deploy Flink cluster using CLI
+#### Using `kinit` (YARN only)
 
-In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
-Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a> 
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it.  The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
 
-## Token Renewal
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
 
-UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
\ No newline at end of file
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab.  In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 512c08a..3ef4ffd 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -96,37 +96,58 @@ These options are useful for debugging a Flink application for memory and garbag
 
 - `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
 
-### Kerberos
+### Kerberos-based Security
 
-Flink supports Kerberos authentication for the following services
+Flink supports Kerberos authentication for the following services:
 
-+ Hadoop Components: such as HDFS, YARN, or HBase.
++ Hadoop Components (such as HDFS, YARN, or HBase)
 + Kafka Connectors (version 0.9+)
-+ Zookeeper Server/Client
++ Zookeeper
 
-Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
-
-**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
+**Kerberos is supported only in Hadoop version 2.6.1 and above. All
   other versions have critical bugs which might fail the Flink job
   unexpectedly.**
 
-**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+Configuring Flink for Kerberos security involves three aspects:
+
+1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
+2. Making the Kerberos credential available to components and connectors as needed
+3. Configuring the component and/or connector to use Kerberos authentication
+
+To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options,
+or login using `kinit` before starting the cluster.
+
+It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues.   If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token lifetime.
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
 
-> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
 
-While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
+If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components.
 
-While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs.
+Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts.
 
-If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option.
+
+Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication.  For the Kafka connector,
+use the following properties:
+
+```
+security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+sasl.kerberos.service.name=kafka
+```
 
-> Keytab (security principal and keytab can be configured through Flink configuration file)
-- `security.keytab`: Path to Keytab file
-- `security.principal`: Principal associated with the keytab
+Flink provides some additional options to configure ZooKeeper security:
 
-Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`).
 
-For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
 
 ### Other
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index eb12d07..fa46fc7 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
@@ -116,13 +117,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		populateSecureConfigurations();
 
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
 				SecureTestEnvironment.getTestKeytab());
-		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
-		ctx.setHadoopConfiguration(conf);
+		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf);
 		try {
 			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index eabb754..fc389e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1395,20 +1395,6 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the Flink installation root directory */
 	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
 
-	// -------------------------------- Security -------------------------------
-
-	/**
-	 * The config parameter defining security credentials required
-	 * for securing Flink cluster.
-	 */
-
-	/** Keytab file key name to be used in flink configuration file */
-	public static final String SECURITY_KEYTAB_KEY = "security.keytab";
-
-	/** Kerberos security principal key name to be used in flink configuration file */
-	public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
-
-
 	/**
 	 * Not instantiable.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 1ee988a..4792eba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,14 +124,6 @@ public class HighAvailabilityOptions {
 			.defaultValue(3)
 			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
 
-	public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = 
-			key("zookeeper.sasl.disable")
-			.defaultValue(true);
-
-	public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = 
-			key("zookeeper.sasl.service-name")
-			.noDefaultValue();
-
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
new file mode 100644
index 0000000..67d101d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to security.
+ */
+public class SecurityOptions {
+
+	// ------------------------------------------------------------------------
+	//  Kerberos Options
+	// ------------------------------------------------------------------------
+
+	public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
+		key("security.kerberos.login.principal")
+			.noDefaultValue()
+			.withDeprecatedKeys("security.principal");
+
+	public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
+		key("security.kerberos.login.keytab")
+			.noDefaultValue()
+			.withDeprecatedKeys("security.keytab");
+
+	public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE =
+		key("security.kerberos.login.use-ticket-cache")
+			.defaultValue(true);
+
+	public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS =
+		key("security.kerberos.login.contexts")
+			.noDefaultValue();
+
+
+	// ------------------------------------------------------------------------
+	//  ZooKeeper Security Options
+	// ------------------------------------------------------------------------
+
+	public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
+		key("zookeeper.sasl.service-name")
+			.defaultValue("zookeeper");
+
+	public static final ConfigOption<String> ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME =
+		key("zookeeper.sasl.login-context-name")
+			.defaultValue("Client");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index c650cfe..f759db6 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -162,21 +162,24 @@ jobmanager.web.port: 8081
 # Flink Cluster Security Configuration (optional configuration)
 #==============================================================================
 
-# Kerberos security for the connectors can be enabled by providing below configurations
-# Security works in two modes - keytab/principal combination or using the Kerberos token cache
-# If keytab and principal are not provided, token cache (manual kinit) will be used
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
 
-#security.keytab: /path/to/kerberos/keytab
-#security.principal: flink-user
+#security.kerberos.login.keytab: /path/to/kerberos/keytab
+#security.kerberos.login.principal: flink-user
+#security.kerberos.login.use-ticket-cache: true
+
+#security.kerberos.login.contexts: Client,KafkaClient
 
 #==============================================================================
 # ZK Security Configuration (optional configuration)
 #==============================================================================
-# Below configurations are applicable if ZK quorum is configured for Kerberos security
 
-# SASL authentication is disabled by default and can be enabled by changig the value to false
-#
-# zookeeper.sasl.disable: true
+# Below configurations are applicable if ZK ensemble is configured for security
 
 # Override below configuration to provide custom ZK service name if configured
 #

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 7c41eaf..da8244f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -21,17 +21,22 @@ package org.apache.flink.api.java.hadoop.mapred.utils;
 
 import java.io.File;
 import java.lang.reflect.Constructor;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +48,8 @@ public final class HadoopUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
 
+	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
 	/**
 	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
 	 */
@@ -163,6 +170,20 @@ public final class HadoopUtils {
 	}
 
 	/**
+	 * Indicates whether the current user has an HDFS delegation token.
+	 */
+	public static boolean hasHDFSDelegationToken() throws Exception {
+		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for (Token<? extends TokenIdentifier> token : usrTok) {
+			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private HadoopUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 4b9bd82..689c26a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -27,7 +27,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -168,7 +167,6 @@ public class MesosApplicationMasterRunner {
 
 			// configure security
 			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
-			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 			SecurityUtils.install(sc);
 
 			// run the actual work in the installed security context

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 75b5043..206c71b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -118,7 +117,6 @@ public class MesosTaskManagerRunner {
 
 		// Run the TM in the security context
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
-		sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 		SecurityUtils.install(sc);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
index 7fe5b3e..271b32d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.overlays;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.slf4j.Logger;
@@ -34,7 +34,7 @@ import java.io.IOException;
  * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
  *
  * The folloowing Flink configuration entries are updated:
- *  - security.keytab
+ *  - security.kerberos.login.keytab
  */
 public class KeytabOverlay extends AbstractContainerOverlay {
 
@@ -60,7 +60,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
 				.setDest(TARGET_PATH)
 				.setCachable(false)
 				.build());
-			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath());
+			container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, TARGET_PATH.getPath());
 		}
 	}
 
@@ -69,7 +69,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
 	}
 
 	/**
-	 * A builder for the {@link HadoopUserOverlay}.
+	 * A builder for the {@link KeytabOverlay}.
 	 */
 	public static class Builder {
 
@@ -79,15 +79,15 @@ public class KeytabOverlay extends AbstractContainerOverlay {
 		 * Configures the overlay using the current environment (and global configuration).
 		 *
 		 * The following Flink configuration settings are checked for a keytab:
-		 *  - security.keytab
+		 *  - security.kerberos.login.keytab
 		 */
 		public Builder fromEnvironment(Configuration globalConfiguration) {
-			String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+			String keytab = globalConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
 			if(keytab != null) {
 				keytabPath = new File(keytab);
 				if(!keytabPath.exists()) {
 					throw new IllegalStateException("Invalid configuration for " +
-						ConfigConstants.SECURITY_KEYTAB_KEY +
+						SecurityOptions.KERBEROS_LOGIN_KEYTAB +
 						"; '" + keytab + "' not found.");
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
new file mode 100644
index 0000000..6af4f23
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A dynamic JAAS configuration.
+ *
+ * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
+ * an (optional) underlying configuration.   Entries from the underlying configuration take
+ * precedence over dynamic entries.
+ */
+public class DynamicConfiguration extends Configuration {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class);
+
+	private final Configuration delegate;
+
+	private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
+
+	/**
+	 * Create a dynamic configuration.
+	 * @param delegate an underlying configuration to delegate to, or null.
+     */
+	public DynamicConfiguration(@Nullable Configuration delegate) {
+		this.delegate = delegate;
+	}
+
+	/**
+	 * Add entries for the given application name.
+     */
+	public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
+		final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+		final AppConfigurationEntry[] updated;
+		if(existing == null) {
+			updated = Arrays.copyOf(entry, entry.length);
+		}
+		else {
+			updated = merge(existing, entry);
+		}
+		dynamicEntries.put(name, updated);
+	}
+
+	/**
+	 * Retrieve the AppConfigurationEntries for the specified <i>name</i>
+	 * from this Configuration.
+	 *
+	 * <p>
+	 *
+	 * @param name the name used to index the Configuration.
+	 *
+	 * @return an array of AppConfigurationEntries for the specified <i>name</i>
+	 *          from this Configuration, or null if there are no entries
+	 *          for the specified <i>name</i>
+	 */
+	@Override
+	public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+		AppConfigurationEntry[] entry = null;
+		if(delegate != null) {
+			entry = delegate.getAppConfigurationEntry(name);
+		}
+		final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+		if(existing != null) {
+			if(entry != null) {
+				entry = merge(entry, existing);
+			}
+			else {
+				entry = Arrays.copyOf(existing, existing.length);
+			}
+		}
+		return entry;
+	}
+
+	private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) {
+		AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length);
+		System.arraycopy(b, 0, merged, a.length, b.length);
+		return merged;
+	}
+
+	@Override
+	public void refresh() {
+		if(delegate != null) {
+			delegate.refresh();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
deleted file mode 100644
index c4527dd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * JAAS configuration provider object that provides default LoginModule for various connectors that supports
- * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class.
- *
- * Different connectors uses different login module name to implement JAAS based authentication support.
- * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the
- * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right
- * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides
- * a standard lookup to get the login module entry for the JAAS based authentication to work.
- *
- * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate.
- *
- * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a>
- *
- */
-
-@Internal
-public class JaasConfiguration extends Configuration {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
-
-	public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
-
-	public static final boolean IBM_JAVA;
-
-	private static final Map<String, String> debugOptions = new HashMap<>();
-
-	private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
-
-	private static final Map<String, String> keytabKerberosOptions = new HashMap<>();
-
-	private static final AppConfigurationEntry userKerberosAce;
-
-	private AppConfigurationEntry keytabKerberosAce = null;
-
-	static {
-
-		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
-
-		if(LOG.isDebugEnabled()) {
-			debugOptions.put("debug", "true");
-		}
-
-		if(IBM_JAVA) {
-			kerberosCacheOptions.put("useDefaultCcache", "true");
-		} else {
-			kerberosCacheOptions.put("doNotPrompt", "true");
-			kerberosCacheOptions.put("useTicketCache", "true");
-		}
-
-		String ticketCache = System.getenv("KRB5CCNAME");
-		if(ticketCache != null) {
-			if(IBM_JAVA) {
-				System.setProperty("KRB5CCNAME", ticketCache);
-			} else {
-				kerberosCacheOptions.put("ticketCache", ticketCache);
-			}
-		}
-
-		kerberosCacheOptions.put("renewTGT", "true");
-		kerberosCacheOptions.putAll(debugOptions);
-
-		userKerberosAce = new AppConfigurationEntry(
-				KerberosUtil.getKrb5LoginModuleName(),
-				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
-				kerberosCacheOptions);
-
-	}
-
-	protected JaasConfiguration(String keytab, String principal) {
-
-		LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal);
-
-		if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
-				(!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){
-			throw new RuntimeException("Both keytab and principal are required and cannot be empty");
-		}
-
-		if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
-
-			if(IBM_JAVA) {
-				keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
-				keytabKerberosOptions.put("credsType", "both");
-			} else {
-				keytabKerberosOptions.put("keyTab", keytab);
-				keytabKerberosOptions.put("doNotPrompt", "true");
-				keytabKerberosOptions.put("useKeyTab", "true");
-				keytabKerberosOptions.put("storeKey", "true");
-			}
-
-			keytabKerberosOptions.put("principal", principal);
-			keytabKerberosOptions.put("refreshKrb5Config", "true");
-			keytabKerberosOptions.putAll(debugOptions);
-
-			keytabKerberosAce = new AppConfigurationEntry(
-					KerberosUtil.getKrb5LoginModuleName(),
-					AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-					keytabKerberosOptions);
-		}
-	}
-
-	public static Map<String, String> getKeytabKerberosOptions() {
-		return keytabKerberosOptions;
-	}
-
-	private static String prependFileUri(String keytabPath) {
-		File f = new File(keytabPath);
-		return f.toURI().toString();
-	}
-
-	@Override
-	public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
-		LOG.debug("JAAS configuration requested for the application entry: {}", applicationName);
-
-		AppConfigurationEntry[] appConfigurationEntry;
-
-		if(keytabKerberosAce != null) {
-			appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce};
-		} else {
-			appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce};
-		}
-
-		return appConfigurationEntry;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
new file mode 100644
index 0000000..7ef9187
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ *
+ * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
+ *
+ * The implementation is inspired from Hadoop UGI class.
+ */
+@Internal
+public class KerberosUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+
+	private static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
+
+	private static final boolean IBM_JAVA;
+
+	private static final Map<String, String> debugOptions = new HashMap<>();
+
+	private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
+
+	private static final AppConfigurationEntry userKerberosAce;
+
+	static {
+
+		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+		if(LOG.isDebugEnabled()) {
+			debugOptions.put("debug", "true");
+		}
+
+		if(IBM_JAVA) {
+			kerberosCacheOptions.put("useDefaultCcache", "true");
+		} else {
+			kerberosCacheOptions.put("doNotPrompt", "true");
+			kerberosCacheOptions.put("useTicketCache", "true");
+		}
+
+		String ticketCache = System.getenv("KRB5CCNAME");
+		if(ticketCache != null) {
+			if(IBM_JAVA) {
+				System.setProperty("KRB5CCNAME", ticketCache);
+			} else {
+				kerberosCacheOptions.put("ticketCache", ticketCache);
+			}
+		}
+
+		kerberosCacheOptions.put("renewTGT", "true");
+		kerberosCacheOptions.putAll(debugOptions);
+
+		userKerberosAce = new AppConfigurationEntry(
+				KerberosUtil.getKrb5LoginModuleName(),
+				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+				kerberosCacheOptions);
+
+	}
+
+	public static AppConfigurationEntry ticketCacheEntry() {
+		return userKerberosAce;
+	}
+
+	public static AppConfigurationEntry keytabEntry(String keytab, String principal) {
+
+		checkNotNull(keytab, "keytab");
+		checkNotNull(principal, "principal");
+
+		Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+		if(IBM_JAVA) {
+			keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
+			keytabKerberosOptions.put("credsType", "both");
+		} else {
+			keytabKerberosOptions.put("keyTab", keytab);
+			keytabKerberosOptions.put("doNotPrompt", "true");
+			keytabKerberosOptions.put("useKeyTab", "true");
+			keytabKerberosOptions.put("storeKey", "true");
+		}
+
+		keytabKerberosOptions.put("principal", principal);
+		keytabKerberosOptions.put("refreshKrb5Config", "true");
+		keytabKerberosOptions.putAll(debugOptions);
+
+		AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+				KerberosUtil.getKrb5LoginModuleName(),
+				AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+				keytabKerberosOptions);
+
+		return keytabKerberosAce;
+	}
+
+	private static String prependFileUri(String keytabPath) {
+		File f = new File(keytabPath);
+		return f.toURI().toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d7fc6ff..d76e7a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,216 +18,162 @@
 
 package org.apache.flink.runtime.security;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.runtime.security.modules.ZooKeeperModule;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /*
- * Utils for configuring security. The following security mechanism are supported:
+ * Utils for configuring security. The following security subsystems are supported:
  *
  * 1. Java Authentication and Authorization Service (JAAS)
  * 2. Hadoop's User Group Information (UGI)
+ * 3. ZooKeeper's process-wide security settings.
  */
 public class SecurityUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
 
-	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
-
-	public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
-
-	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
-
-	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
-
 	private static SecurityContext installedContext = new NoOpSecurityContext();
 
+	private static List<SecurityModule> installedModules = null;
+
 	public static SecurityContext getInstalledContext() { return installedContext; }
 
+	@VisibleForTesting
+	static List<SecurityModule> getInstalledModules() {
+		return installedModules;
+	}
+
 	/**
-	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
-	 * It creates the in-memory JAAS configuration object which will serve appropriate
-	 * ApplicationConfigurationEntry for the connector login module implementation that
-	 * authenticates Kerberos identity using SASL/JAAS based mechanism.
+	 * Installs a process-wide security configuration.
+	 *
+	 * Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
 	 */
 	public static void install(SecurityConfiguration config) throws Exception {
 
-		if (!config.securityIsEnabled()) {
-			// do not perform any initialization if no Kerberos crendetails are provided
-			return;
+		// install the security modules
+		List<SecurityModule> modules = new ArrayList<>();
+		try {
+			for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) {
+				SecurityModule module = moduleClass.newInstance();
+				module.install(config);
+				modules.add(module);
+			}
 		}
+		catch(Exception ex) {
+			throw new Exception("unable to establish the security context", ex);
+		}
+		installedModules = modules;
 
-		// establish the JAAS config
-		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
-		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-		populateSystemSecurityProperties(config.flinkConf);
-
-		// establish the UGI login user
-		UserGroupInformation.setConfiguration(config.hadoopConf);
-
-		// only configure Hadoop security if we have security enabled
-		if (UserGroupInformation.isSecurityEnabled()) {
-
-			final UserGroupInformation loginUser;
-
-			if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
-				String keytabPath = (new File(config.keytab)).getAbsolutePath();
-
-				UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-				loginUser = UserGroupInformation.getLoginUser();
-
-				// supplement with any available tokens
-				String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-				if (fileLocation != null) {
-				/*
-				 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
-				 * used in the context of reading the stored tokens from UGI.
-				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-				 * loginUser.addCredentials(cred);
-				*/
-					try {
-						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-							File.class, org.apache.hadoop.conf.Configuration.class);
-						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-							config.hadoopConf);
-						Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
-							Credentials.class);
-						addCredentialsMethod.invoke(loginUser, cred);
-					} catch (NoSuchMethodException e) {
-						LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
-					}
-				}
-			} else {
-				// login with current user credentials (e.g. ticket cache)
+		// install a security context
+		// use the Hadoop login user as the subject of the installed security context
+		if (!(installedContext instanceof NoOpSecurityContext)) {
+			LOG.warn("overriding previous security context");
+		}
+		UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+		installedContext = new HadoopSecurityContext(loginUser);
+	}
+
+	static void uninstall() {
+		if(installedModules != null) {
+			for (SecurityModule module : Lists.reverse(installedModules)) {
 				try {
-					//Use reflection API to get the login user object
-					//UserGroupInformation.loginUserFromSubject(null);
-					Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-					Subject subject = null;
-					loginUserFromSubjectMethod.invoke(null, subject);
-				} catch (NoSuchMethodException e) {
-					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+					module.uninstall();
 				}
-
-				// note that the stored tokens are read automatically
-				loginUser = UserGroupInformation.getLoginUser();
-			}
-
-			LOG.info("Hadoop user set to {}", loginUser.toString());
-
-			boolean delegationToken = false;
-			final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
-			Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-			for (Token<? extends TokenIdentifier> token : usrTok) {
-				final Text id = new Text(token.getIdentifier());
-				LOG.debug("Found user token " + id + " with " + token);
-				if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
-					delegationToken = true;
+				catch(UnsupportedOperationException ignored) {
 				}
-			}
-
-			if (!loginUser.hasKerberosCredentials()) {
-				//throw an error in non-yarn deployment if kerberos cache is not available
-				if (!delegationToken) {
-					LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
-					throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+				catch(SecurityModule.SecurityInstallException e) {
+					LOG.warn("unable to uninstall a security module", e);
 				}
 			}
-
-			if (!(installedContext instanceof NoOpSecurityContext)) {
-				LOG.warn("overriding previous security context");
-			}
-
-			installedContext = new HadoopSecurityContext(loginUser);
+			installedModules = null;
 		}
-	}
 
-	static void clearContext() {
 		installedContext = new NoOpSecurityContext();
 	}
 
-	/*
-	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
-	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
-	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-	 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
-	 * Kafka current code behavior.
+	/**
+	 * The global security configuration.
+	 *
+	 * See {@link SecurityOptions} for corresponding configuration options.
 	 */
-	private static void populateSystemSecurityProperties(Configuration configuration) {
-		Preconditions.checkNotNull(configuration, "The supplied configuration was null");
+	public static class SecurityConfiguration {
 
-		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+		private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
+			Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class));
 
-		if (disableSaslClient) {
-			LOG.info("SASL client auth for ZK will be disabled");
-			//SASL auth is disabled by default but will be enabled if specified in configuration
-			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
-			return;
-		}
+		private final List<Class<? extends SecurityModule>> securityModules;
 
-		// load Jaas config file to initialize SASL
-		final File jaasConfFile;
-		try {
-			Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
-			InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
-			Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
-			jaasConfFile = jaasConfPath.toFile();
-			jaasConfFile.deleteOnExit();
-			jaasConfStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
-				"locate pseudo Jaas config provided with Flink", e);
-		}
+		private final org.apache.hadoop.conf.Configuration hadoopConf;
 
-		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
-				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+		private final boolean useTicketCache;
 
-		//ZK client module lookup the configuration to handle SASL.
-		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
-		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
+		private final String keytab;
 
-		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
-		if (!StringUtils.isBlank(zkSaslServiceName)) {
-			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
-			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
-		}
+		private final String principal;
 
-	}
+		private final List<String> loginContextNames;
 
-	/**
-	 * Inputs for establishing the security context.
-	 */
-	public static class SecurityConfiguration {
+		private final String zkServiceName;
 
-		private Configuration flinkConf;
+		private final String zkLoginContextName;
 
-		private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+		/**
+		 * Create a security configuration from the global configuration.
+		 * @param flinkConf the Flink global configuration.
+         */
+		public SecurityConfiguration(Configuration flinkConf) {
+			this(flinkConf, HadoopUtils.getHadoopConfiguration());
+		}
 
-		private String keytab;
+		/**
+		 * Create a security configuration from the global configuration.
+		 * @param flinkConf the Flink global configuration.
+		 * @param hadoopConf the Hadoop configuration.
+		 */
+		public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
+			this(flinkConf, hadoopConf, DEFAULT_MODULES);
+		}
 
-		private String principal;
+		/**
+		 * Create a security configuration from the global configuration.
+		 * @param flinkConf the Flink global configuration.
+		 * @param hadoopConf the Hadoop configuration.
+		 * @param securityModules the security modules to apply.
+		 */
+		public SecurityConfiguration(Configuration flinkConf,
+				org.apache.hadoop.conf.Configuration hadoopConf,
+				List<? extends Class<? extends SecurityModule>> securityModules) {
+			this.hadoopConf = checkNotNull(hadoopConf);
+			this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
+			this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+			this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+			this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
+			this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+			this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
+			this.securityModules = Collections.unmodifiableList(securityModules);
+
+			validate();
+		}
 
 		public String getKeytab() {
 			return keytab;
@@ -237,48 +183,50 @@ public class SecurityUtils {
 			return principal;
 		}
 
-		public SecurityConfiguration(Configuration flinkConf) {
-			this.flinkConf = flinkConf;
+		public boolean useTicketCache() {
+			return useTicketCache;
+		}
 
-			String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-			String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-			validate(keytab, principal);
+		public org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+			return hadoopConf;
+		}
 
-			this.keytab = keytab;
-			this.principal = principal;
+		public List<Class<? extends SecurityModule>> getSecurityModules() {
+			return securityModules;
 		}
 
-		public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
-			this.hadoopConf = conf;
-			return this;
+		public List<String> getLoginContextNames() {
+			return loginContextNames;
 		}
 
-		private void validate(String keytab, String principal) {
-			LOG.debug("keytab {} and principal {} .", keytab, principal);
+		public String getZooKeeperServiceName() {
+			return zkServiceName;
+		}
 
-			if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
-					!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
-				if(StringUtils.isBlank(keytab)) {
-					LOG.warn("Keytab is null or empty");
-				}
+		public String getZooKeeperLoginContextName() {
+			return zkLoginContextName;
+		}
+
+		private void validate() {
+			if(!StringUtils.isBlank(keytab)) {
+				// principal is required
 				if(StringUtils.isBlank(principal)) {
-					LOG.warn("Principal is null or empty");
+					throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
 				}
-				throw new RuntimeException("Requires both keytab and principal to be provided");
-			}
 
-			if(!StringUtils.isBlank(keytab)) {
+				// check the keytab is readable
 				File keytabFile = new File(keytab);
-				if(!keytabFile.exists() || !keytabFile.isFile()) {
-					LOG.warn("Not a valid keytab: {} file", keytab);
-					throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+				if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
+					throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
 				}
 			}
-
 		}
 
-		public boolean securityIsEnabled() {
-			return keytab != null && principal != null;
+		private static List<String> parseList(String value) {
+			if(value == null) {
+				return Collections.emptyList();
+			}
+			return Arrays.asList(value.split(","));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
new file mode 100644
index 0000000..9344faf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Responsible for installing a Hadoop login user.
+ */
+public class HadoopModule implements SecurityModule {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
+
+	UserGroupInformation loginUser;
+
+	@Override
+	public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+		UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+
+		try {
+			if (UserGroupInformation.isSecurityEnabled() &&
+				!StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
+				String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
+
+				UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
+
+				loginUser = UserGroupInformation.getLoginUser();
+
+				// supplement with any available tokens
+				String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+				if (fileLocation != null) {
+					/*
+					 * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+					 * used in the context of reading the stored tokens from UGI.
+					 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+					 * loginUser.addCredentials(cred);
+					*/
+					try {
+						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+							File.class, org.apache.hadoop.conf.Configuration.class);
+						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+							securityConfig.getHadoopConfiguration());
+						Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+							Credentials.class);
+						addCredentialsMethod.invoke(loginUser, cred);
+					} catch (NoSuchMethodException e) {
+						LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+					} catch (InvocationTargetException e) {
+						throw e.getTargetException();
+					}
+				}
+			} else {
+				// login with current user credentials (e.g. ticket cache, OS login)
+				// note that the stored tokens are read automatically
+				try {
+					//Use reflection API to get the login user object
+					//UserGroupInformation.loginUserFromSubject(null);
+					Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+					loginUserFromSubjectMethod.invoke(null, (Subject) null);
+				} catch (NoSuchMethodException e) {
+					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+				} catch (InvocationTargetException e) {
+					throw e.getTargetException();
+				}
+
+				loginUser = UserGroupInformation.getLoginUser();
+			}
+
+			if (UserGroupInformation.isSecurityEnabled()) {
+				// note: UGI::hasKerberosCredentials inaccurately reports false
+				// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+				// so we check only in ticket cache scenario.
+				if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) {
+					// a delegation token is an adequate substitute in most cases
+					if (!HadoopUtils.hasHDFSDelegationToken()) {
+						LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials");
+					}
+				}
+			}
+
+			LOG.info("Hadoop user set to {}", loginUser);
+
+		} catch (Throwable ex) {
+			throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
+		}
+	}
+
+	@Override
+	public void uninstall() throws SecurityInstallException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
new file mode 100644
index 0000000..f8b9bdf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * <p>
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * <p>
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka.  Note that the JRE actually draws on numerous file locations.
+ * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);
+
+	static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+	static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+	private String priorConfigFile;
+	private javax.security.auth.login.Configuration priorConfig;
+
+	private DynamicConfiguration currentConfig;
+
+	@Override
+	public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+		// ensure that a config file is always defined, for compatibility with
+		// ZK and Kafka which check for the system property and existence of the file
+		priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+		if (priorConfigFile == null) {
+			File configFile = generateDefaultConfigFile();
+			System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
+		}
+
+		// read the JAAS configuration file
+		priorConfig = javax.security.auth.login.Configuration.getConfiguration();
+
+		// construct a dynamic JAAS configuration
+		currentConfig = new DynamicConfiguration(priorConfig);
+
+		// wire up the configured JAAS login contexts to use the krb5 entries
+		AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
+		if(krb5Entries != null) {
+			for (String app : securityConfig.getLoginContextNames()) {
+				currentConfig.addAppConfigurationEntry(app, krb5Entries);
+			}
+		}
+
+		javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+	}
+
+	@Override
+	public void uninstall() throws SecurityInstallException {
+		if(priorConfigFile != null) {
+			System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
+		} else {
+			System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
+		}
+		javax.security.auth.login.Configuration.setConfiguration(priorConfig);
+	}
+
+	public DynamicConfiguration getCurrentConfiguration() {
+		return currentConfig;
+	}
+
+	private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
+
+		AppConfigurationEntry userKerberosAce = null;
+		if (securityConfig.useTicketCache()) {
+			userKerberosAce = KerberosUtils.ticketCacheEntry();
+		}
+		AppConfigurationEntry keytabKerberosAce = null;
+		if (securityConfig.getKeytab() != null) {
+			keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal());
+		}
+
+		AppConfigurationEntry[] appConfigurationEntry;
+		if (userKerberosAce != null && keytabKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce, userKerberosAce};
+		} else if (keytabKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce};
+		} else if (userKerberosAce != null) {
+			appConfigurationEntry = new AppConfigurationEntry[]{userKerberosAce};
+		} else {
+			return null;
+		}
+
+		return appConfigurationEntry;
+	}
+
+	/**
+	 * Generate the default JAAS config file.
+	 */
+	private static File generateDefaultConfigFile() {
+		final File jaasConfFile;
+		try {
+			Path jaasConfPath = Files.createTempFile("jaas-", ".conf");
+			try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) {
+				Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+			}
+			jaasConfFile = jaasConfPath.toFile();
+			jaasConfFile.deleteOnExit();
+		} catch (IOException e) {
+			throw new RuntimeException("unable to generate a JAAS configuration file", e);
+		}
+		return jaasConfFile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
new file mode 100644
index 0000000..fbe1db9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+import java.security.GeneralSecurityException;
+
+/**
+ * An installable security module.
+ */
+public interface SecurityModule {
+
+	/**
+	 * Install the security module.
+	 *
+	 * @param configuration the security configuration.
+	 * @throws SecurityInstallException if the security module couldn't be installed.
+	 */
+	void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException;
+
+	/**
+	 * Uninstall the security module.
+	 *
+	 * @throws SecurityInstallException if the security module couldn't be uninstalled.
+	 * @throws UnsupportedOperationException if the security module doesn't support uninstallation.
+	 */
+	void uninstall() throws SecurityInstallException;
+
+	/**
+	 * Indicates a problem with installing or uninstalling a security module.
+	 */
+	class SecurityInstallException extends GeneralSecurityException {
+		private static final long serialVersionUID = 1L;
+
+		public SecurityInstallException(String msg) {
+			super(msg);
+		}
+
+		public SecurityInstallException(String msg, Throwable cause) {
+			super(msg, cause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
new file mode 100644
index 0000000..c0ba4a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * Responsible for installing a process-wide ZooKeeper security configuration.
+ */
+public class ZooKeeperModule implements SecurityModule {
+
+	private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	/**
+	 * A system property for setting whether ZK uses SASL.
+	 */
+	private static final String ZK_ENABLE_CLIENT_SASL = "zookeeper.sasl.client";
+
+	/**
+	 * A system property for setting the expected ZooKeeper service name.
+	 */
+	private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+	/**
+	 * A system property for setting the login context name to use.
+	 */
+	private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
+
+	private String priorServiceName;
+
+	private String priorLoginContextName;
+
+	@Override
+	public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+
+		priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
+		if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
+			System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
+		}
+
+		priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
+		if (!"Client".equals(configuration.getZooKeeperLoginContextName())) {
+			System.setProperty(ZK_LOGIN_CONTEXT_NAME, configuration.getZooKeeperLoginContextName());
+		}
+	}
+
+	@Override
+	public void uninstall() throws SecurityInstallException {
+		if(priorServiceName != null) {
+			System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
+		} else {
+			System.clearProperty(ZK_SASL_CLIENT_USERNAME);
+		}
+		if(priorLoginContextName != null) {
+			System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
+		} else {
+			System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
+		}
+	}
+
+}