You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/11 18:06:52 UTC
[1/5] flink git commit: [FLINK-5364] [security] Fix documentation
setup for Kerberos
Repository: flink
Updated Branches:
refs/heads/release-1.2 8d3ad4515 -> 699f4b05b
[FLINK-5364] [security] Fix documentation setup for Kerberos
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/699f4b05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/699f4b05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/699f4b05
Branch: refs/heads/release-1.2
Commit: 699f4b05b36de49b6892f0cc1222a5a59179b407
Parents: 00193f7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 11 14:32:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100
----------------------------------------------------------------------
docs/internals/flink_security.md | 146 ----------------------------------
docs/ops/security-kerberos.md | 145 +++++++++++++++++++++++++++++++++
2 files changed, 145 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/699f4b05/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
deleted file mode 100644
index a83f3b9..0000000
--- a/docs/internals/flink_security.md
+++ /dev/null
@@ -1,146 +0,0 @@
----
-title: "Flink Security"
-# Top navigation
-top-nav-group: internals
-top-nav-pos: 10
-top-nav-title: Flink Security
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
-filesystems, connectors, and state backends.
-
-## Objective
-The primary goals of the Flink Kerberos security infrastructure are:
-1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
-2. to authenticate to ZooKeeper (if configured to use SASL)
-3. to authenticate to Hadoop components (e.g. HDFS, HBase)
-
-In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
-data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
-or ticket cache entry.
-
-The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
-or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. To use a different keytab
-for for a certain job, simply launch a separate Flink cluster with a different configuration. Numerous Flink clusters may run side-by-side in a YARN
-or Mesos environment.
-
-## How Flink Security works
-In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort,
-Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication:
-
-- Kafka (0.9+)
-- HDFS
-- HBase
-- ZooKeeper
-
-Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable
-Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of
-Kerbreros credentials, which is then explicitly used by each component.
-
-The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
-are installed at startup. The next section describes each security module.
-
-### Hadoop Security Module
-This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is
-then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
-
-If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise,
-the login user conveys only the user identity of the OS account that launched the cluster.
-
-### JAAS Security Module
-This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
-Kafka, and other such components that rely on JAAS.
-
-Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any
-dynamic entries provided by this module.
-
-### ZooKeeper Security Module
-This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
-and the JAAS login context name (default: `Client`).
-
-## Security Configuration
-
-### Flink Configuration
-The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
-
-- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
-
-A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
-
-- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-
-- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-
-These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
-
-- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
-
-ZooKeeper-related configuration overrides:
-
-- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
-
-- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
-one of the values specified in `security.kerberos.login.contexts`.
-
-### Hadoop Configuration
-
-The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
-
-Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop
-delegation tokens (for HDFS and for HBase).
-
-## Deployment Modes
-Here is some information specific to each deployment mode.
-
-### Standalone Mode
-
-Steps to run a secure Flink cluster in standalone/cluster mode:
-1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
-2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
-3. Deploy Flink cluster as normal.
-
-### YARN/Mesos Mode
-
-Steps to run a secure Flink cluster in YARN/Mesos mode:
-1. Add security-related configuration options to the Flink configuration file on the client.
-2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
-3. Deploy Flink cluster as normal.
-
-In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
-
-For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
-
-#### Using `kinit` (YARN only)
-
-In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
-This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it. The main drawback is
-that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
-
-Steps to run a secure Flink cluster using `kinit`:
-1. Add security-related configuration options to the Flink configuration file on the client.
-2. Login using the `kinit` command.
-3. Deploy Flink cluster as normal.
-
-## Further Details
-### Ticket Renewal
-Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
-Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab. In the delegation token scenario,
-YARN itself renews the token (up to its maximum lifespan).
http://git-wip-us.apache.org/repos/asf/flink/blob/699f4b05/docs/ops/security-kerberos.md
----------------------------------------------------------------------
diff --git a/docs/ops/security-kerberos.md b/docs/ops/security-kerberos.md
new file mode 100644
index 0000000..2afe760
--- /dev/null
+++ b/docs/ops/security-kerberos.md
@@ -0,0 +1,145 @@
+---
+title: "Kerberos Authentication Setup and Configuration"
+nav-parent_id: setup
+nav-pos: 10
+nav-title: Kerberos
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
+filesystems, connectors, and state backends.
+
+## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase)
+
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
+data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration. Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
+
+## How Flink Security works
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication:
+
+- Kafka (0.9+)
+- HDFS
+- HBase
+- ZooKeeper
+
+Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup. The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
+
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
+
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
+
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
+
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
+
+ZooKeeper-related configuration overrides:
+
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
+
+Steps to run a secure Flink cluster in standalone/cluster mode:
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
+
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
+
+#### Using `kinit` (YARN only)
+
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it. The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
+
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
+
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab. In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).
[3/5] flink git commit: [FLINK-5364] [security] Rework JAAS
configuration to support user-supplied entries
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf
index 7f0f06b..d287ff4 100644
--- a/flink-runtime/src/main/resources/flink-jaas.conf
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -1,3 +1,4 @@
+/**
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,10 +18,6 @@
################################################################################
# We are using this file as an workaround for the Kafka and ZK SASL implementation
# since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal implementation
-# uses a process-wide in-memory java security configuration object.
# Please do not edit/delete this file - See FLINK-3929
-sample {
- useKeyTab=false
- useTicketCache=true;
-}
+**/
+
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
index 0570f28..1847ec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.overlays;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.junit.Rule;
@@ -46,7 +47,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase {
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
- assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null));
+ assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
checkArtifact(spec, TARGET_PATH);
}
@@ -64,7 +65,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase {
final Configuration conf = new Configuration();
File keytab = tempFolder.newFile();
- conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath());
+ conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytab.getAbsolutePath());
KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf);
assertEquals(builder.keytabPath, keytab);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
deleted file mode 100644
index 89e5ef9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.junit.Test;
-
-import javax.security.auth.login.AppConfigurationEntry;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link JaasConfiguration}.
- */
-public class JaasConfigurationTest {
-
- @Test
- public void testInvalidKerberosParams() {
- String keytab = "user.keytab";
- String principal = null;
- try {
- new JaasConfiguration(keytab, principal);
- } catch(RuntimeException re) {
- assertEquals("Both keytab and principal are required and cannot be empty",re.getMessage());
- }
- }
-
- @Test
- public void testDefaultAceEntry() {
- JaasConfiguration conf = new JaasConfiguration(null,null);
- javax.security.auth.login.Configuration.setConfiguration(conf);
- final AppConfigurationEntry[] entry = conf.getAppConfigurationEntry("test");
- AppConfigurationEntry ace = entry[0];
- assertEquals(ace.getLoginModuleName(), KerberosUtil.getKrb5LoginModuleName());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
new file mode 100644
index 0000000..4c899e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the {@link KerberosUtils}.
+ */
+public class KerberosUtilsTest {
+
+ @Test
+ public void testTicketCacheEntry() {
+ AppConfigurationEntry entry = KerberosUtils.ticketCacheEntry();
+ assertNotNull(entry);
+ }
+
+ @Test
+ public void testKeytabEntry() {
+ String keytab = "user.keytab";
+ String principal = "user";
+ AppConfigurationEntry entry = KerberosUtils.keytabEntry(keytab, principal);
+ assertNotNull(entry);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 2648a7a..c5624f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -18,97 +18,64 @@
package org.apache.flink.runtime.security;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.flink.runtime.security.modules.SecurityModule;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Test;
-import java.lang.reflect.Method;
+import java.util.Collections;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
/**
* Tests for the {@link SecurityUtils}.
*/
public class SecurityUtilsTest {
- @AfterClass
- public static void afterClass() {
- SecurityUtils.clearContext();
- System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
- }
+ static class TestSecurityModule implements SecurityModule {
+ boolean installed;
- @Test
- public void testCreateInsecureHadoopCtx() {
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
- try {
- SecurityUtils.install(sc);
- assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
- } catch (Exception e) {
- fail(e.getMessage());
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+ installed = true;
}
- }
- @Test
- public void testInvalidUGIContext() {
- try {
- new HadoopSecurityContext(null);
- } catch (RuntimeException re) {
- assertEquals("UGI passed cannot be null",re.getMessage());
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ installed = false;
}
}
+ @AfterClass
+ public static void afterClass() {
+ SecurityUtils.uninstall();
+ }
+
@Test
- /**
- * The Jaas configuration file provided should not be overridden.
- */
- public void testJaasPropertyOverride() throws Exception {
- String confFile = "jaas.conf";
- System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+ public void testModuleInstall() throws Exception {
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
+ new Configuration(), new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
- SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration()));
+ SecurityUtils.install(sc);
+ assertEquals(1, SecurityUtils.getInstalledModules().size());
+ TestSecurityModule testModule = (TestSecurityModule) SecurityUtils.getInstalledModules().get(0);
+ assertTrue(testModule.installed);
- Assert.assertEquals(
- confFile,
- System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+ SecurityUtils.uninstall();
+ assertNull(SecurityUtils.getInstalledModules());
+ assertFalse(testModule.installed);
}
+ @Test
+ public void testSecurityContext() throws Exception {
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
+ new Configuration(), new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
- private String getOSUserName() throws Exception {
- String userName = "";
- OperatingSystem os = OperatingSystem.getCurrentOperatingSystem();
- String className;
- String methodName;
-
- switch(os) {
- case LINUX:
- case MAC_OS:
- className = "com.sun.security.auth.module.UnixSystem";
- methodName = "getUsername";
- break;
- case WINDOWS:
- className = "com.sun.security.auth.module.NTSystem";
- methodName = "getName";
- break;
- case SOLARIS:
- className = "com.sun.security.auth.module.SolarisSystem";
- methodName = "getUsername";
- break;
- case FREE_BSD:
- case UNKNOWN:
- default:
- className = null;
- methodName = null;
- }
+ SecurityUtils.install(sc);
+ assertEquals(HadoopSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
- if( className != null ){
- Class<?> c = Class.forName( className );
- Method method = c.getDeclaredMethod( methodName );
- Object o = c.newInstance();
- userName = (String) method.invoke( o );
- }
- return userName;
+ SecurityUtils.uninstall();
+ assertEquals(NoOpSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index de715c6..10450c3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -18,10 +18,10 @@
package org.apache.flink.test.util;
-import org.apache.flink.configuration.ConfigConstants;
+
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.minikdc.MiniKdc;
import org.junit.rules.TemporaryFolder;
@@ -30,10 +30,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
-import java.io.FileWriter;
-import java.io.BufferedWriter;
-import java.io.PrintWriter;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -111,11 +107,11 @@ public class SecureTestEnvironment {
//the context can be reinitialized with Hadoop configuration by calling
//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
//See Yarn test case module for reference
- createJaasConfig(baseDirForSecureRun);
Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
- flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
+ flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Client,KafkaClient");
SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
@@ -178,8 +174,8 @@ public class SecureTestEnvironment {
conf = flinkConf;
}
- conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab);
- conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal);
+ conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , testKeytab);
+ conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , testPrincipal);
return conf;
}
@@ -190,22 +186,19 @@ public class SecureTestEnvironment {
if(testZkServerPrincipal != null ) {
TestingSecurityContext.ClientSecurityConfiguration zkServer =
- new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab,
- "Server", "zk-server");
+ new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab);
clientSecurityConfigurationMap.put("Server",zkServer);
}
if(testZkClientPrincipal != null ) {
TestingSecurityContext.ClientSecurityConfiguration zkClient =
- new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab,
- "Client", "zk-client");
+ new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab);
clientSecurityConfigurationMap.put("Client",zkClient);
}
if(testKafkaServerPrincipal != null ) {
TestingSecurityContext.ClientSecurityConfiguration kafkaServer =
- new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab,
- "KafkaServer", "kafka-server");
+ new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab);
clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
}
@@ -220,23 +213,4 @@ public class SecureTestEnvironment {
return hadoopServicePrincipal;
}
- /*
- * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL
- * implementation lookup java.security.auth.login.config
- */
- private static void createJaasConfig(File baseDirForSecureRun) {
-
- try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun, SecurityUtils.JAAS_CONF_FILENAME), true);
- BufferedWriter bw = new BufferedWriter(fw);
- PrintWriter out = new PrintWriter(bw))
- {
- out.println("sample {");
- out.println("useKeyTab=false");
- out.println("useTicketCache=true;");
- out.println("};");
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while trying to create JAAS config.", e);
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
deleted file mode 100644
index 25b2362..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.JaasConfiguration;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage
- * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated
- * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module
- * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier
- */
-
-@Internal
-public class TestingJaasConfiguration extends JaasConfiguration {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class);
-
- public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap;
-
- TestingJaasConfiguration(String keytab, String principal, Map<String,
- TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) {
- super(keytab, principal);
- this.clientSecurityConfigurationMap = clientSecurityConfigurationMap;
- }
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
- LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName);
-
- AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName);
-
- if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) {
-
- if(clientSecurityConfigurationMap.containsKey(applicationName)) {
-
- LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName);
-
- TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName);
-
- if(appConfigurationEntry != null && appConfigurationEntry.length > 0) {
-
- for(int count=0; count < appConfigurationEntry.length; count++) {
-
- AppConfigurationEntry ace = appConfigurationEntry[count];
-
- if (ace.getOptions().containsKey("keyTab")) {
-
- String keyTab = conf.getKeytab();
- String principal = conf.getPrincipal();
-
- LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " +
- "use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal);
-
- Map<String, String> newKeytabKerberosOptions = new HashMap<>();
- newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
-
- newKeytabKerberosOptions.put("keyTab", keyTab);
- newKeytabKerberosOptions.put("principal", principal);
-
- AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
- KerberosUtil.getKrb5LoginModuleName(),
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- newKeytabKerberosOptions);
- appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce};
-
- LOG.debug("---->Login Module is using Keytab based configuration<------");
- LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName());
- LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag());
- LOG.debug("Options: " + keytabKerberosAce.getOptions());
- }
- }
- }
- }
-
- }
-
- return appConfigurationEntry;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
index 4343013..ff1810b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -19,10 +19,16 @@
package org.apache.flink.test.util;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import javax.security.auth.login.AppConfigurationEntry;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/*
* Test security context to support handling both client and server principals in MiniKDC
* This class is used only in integration test code for connectors like Kafka, HDFS etc.,
@@ -36,21 +42,20 @@ public class TestingSecurityContext {
SecurityUtils.install(config);
- // establish the JAAS config for Test environment
- TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(),
- config.getPrincipal(), clientSecurityConfigurationMap);
- javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+ // install dynamic JAAS entries
+ checkArgument(config.getSecurityModules().contains(JaasModule.class));
+ DynamicConfiguration jaasConf = (DynamicConfiguration) javax.security.auth.login.Configuration.getConfiguration();
+ for(Map.Entry<String,ClientSecurityConfiguration> e : clientSecurityConfigurationMap.entrySet()) {
+ AppConfigurationEntry entry = KerberosUtils.keytabEntry(e.getValue().getKeytab(), e.getValue().getPrincipal());
+ jaasConf.addAppConfigurationEntry(e.getKey(), entry);
+ }
}
public static class ClientSecurityConfiguration {
- private String principal;
-
- private String keytab;
+ private final String principal;
- private String moduleName;
-
- private String jaasServiceName;
+ private final String keytab;
public String getPrincipal() {
return principal;
@@ -60,21 +65,10 @@ public class TestingSecurityContext {
return keytab;
}
- public String getModuleName() {
- return moduleName;
- }
-
- public String getJaasServiceName() {
- return jaasServiceName;
- }
-
- public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) {
+ public ClientSecurityConfiguration(String principal, String keytab) {
this.principal = principal;
this.keytab = keytab;
- this.moduleName = moduleName;
- this.jaasServiceName = jaasServiceName;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 45fd8d0..d3558a9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -18,8 +18,8 @@
package org.apache.flink.yarn;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestingSecurityContext;
@@ -53,13 +53,13 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
SecureTestEnvironment.getTestKeytab());
Configuration flinkConfig = new Configuration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
SecureTestEnvironment.getTestKeytab());
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());
- SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
- ctx.setHadoopConfiguration(yarnConfiguration);
+ SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig,
+ yarnConfiguration);
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index dc7cca3..ca8a0da 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
@@ -424,8 +425,8 @@ public abstract class YarnTestBase extends TestLogger {
LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file");
out.println("");
out.println("#Security Configurations Auto Populated ");
- out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
- out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+ out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab);
+ out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal);
out.println("");
} catch (IOException e) {
throw new RuntimeException("Exception occured while trying to append the security configurations.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ca18439..70f3222 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,11 +22,11 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -404,6 +404,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
@Override
public YarnClusterClient deploy() {
try {
+ if(UserGroupInformation.isSecurityEnabled()) {
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+ UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+ if (useTicketCache && !loginUser.hasKerberosCredentials()) {
+ LOG.error("Hadoop security is enabled but the login user does not have Kerberos credentials");
+ throw new RuntimeException("Hadoop security is enabled but the login user " +
+ "does not have Kerberos credentials");
+ }
+ }
return deployInternal();
} catch (Exception e) {
throw new RuntimeException("Couldn't deploy Yarn cluster", e);
@@ -583,12 +595,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- //check if there is a JAAS config file
- File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityUtils.JAAS_CONF_FILENAME);
- if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
- effectiveShipFiles.add(jaasConfigFile);
- }
-
addLibFolderToShipFiles(effectiveShipFiles);
// add the user jar to the classpath of the to-be-created cluster
@@ -730,7 +736,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// setup security tokens
LocalResource keytabResource = null;
Path remotePathKeytab = null;
- String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+ String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if(keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
keytabResource = Records.newRecord(LocalResource.class);
@@ -773,7 +779,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if(keytabResource != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
- String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+ String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1826d43..e4027d4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -169,21 +170,27 @@ public class YarnApplicationMasterRunner {
final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
if(keytabPath != null && remoteKeytabPrincipal != null) {
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
if(krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
- sc.setHadoopConfiguration(conf);
+ hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ }
+
+ SecurityUtils.SecurityConfiguration sc;
+ if(hadoopConfiguration != null) {
+ sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
+ } else {
+ sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
}
SecurityUtils.install(sc);
@@ -256,8 +263,8 @@ public class YarnApplicationMasterRunner {
LOG.info("keytabPath: {}", keytabPath);
}
if(keytabPath != null && remoteKeytabPrincipal != null) {
- config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
// Hadoop/Yarn configuration (loads config data automatically from classpath files)
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 015eb1b..059f1aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -111,22 +112,28 @@ public class YarnTaskManagerRunner {
try {
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
if(krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
- sc.setHadoopConfiguration(conf);
+ hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ }
+
+ SecurityUtils.SecurityConfiguration sc;
+ if(hadoopConfiguration != null) {
+ sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+ } else {
+ sc = new SecurityUtils.SecurityConfiguration(configuration);
}
if(keytabPath != null && remoteKeytabPrincipal != null) {
- configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
- configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
SecurityUtils.install(sc);
[5/5] flink git commit: [FLINK-5442] [streaming] Ensure order of enum
elements in StateDescriptor.Type through a test
Posted by se...@apache.org.
[FLINK-5442] [streaming] Ensure order of enum elements in StateDescriptor.Type through a test
This closes #3091
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/758ea79a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/758ea79a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/758ea79a
Branch: refs/heads/release-1.2
Commit: 758ea79a1cda1dec58d43266487ce663f7205f86
Parents: 8d3ad45
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jan 10 22:52:58 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/state/StateDescriptor.java | 3 ++-
.../runtime/state/SerializationProxiesTest.java | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/758ea79a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index de3cd4e..ad9d417 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -49,8 +49,9 @@ import static java.util.Objects.requireNonNull;
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
+ // Do not change the order of the elements in this enum, ordinal is used in serialization
public enum Type {
- VALUE, LIST, REDUCING, FOLDING, @Deprecated UNKNOWN
+ @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING
}
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/758ea79a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 9211e92..832b022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -96,4 +96,20 @@ public class SerializationProxiesTest {
Assert.assertEquals(name, metaInfo.getStateName());
}
+
+ /**
+ * This test fixes the order of elements in the enum which is important for serialization. Do not modify this test
+ * except if you are entirely sure what you are doing.
+ */
+ @Test
+ public void testFixTypeOrder() {
+ // ensure all elements are covered
+ Assert.assertEquals(5, StateDescriptor.Type.values().length);
+ // fix the order of elements to keep serialization format stable
+ Assert.assertEquals(0, StateDescriptor.Type.UNKNOWN.ordinal());
+ Assert.assertEquals(1, StateDescriptor.Type.VALUE.ordinal());
+ Assert.assertEquals(2, StateDescriptor.Type.LIST.ordinal());
+ Assert.assertEquals(3, StateDescriptor.Type.REDUCING.ordinal());
+ Assert.assertEquals(4, StateDescriptor.Type.FOLDING.ordinal());
+ }
}
\ No newline at end of file
[2/5] flink git commit: [FLINK-5427] [docs] Fix code example in
event_timestamps_watermarks.md
Posted by se...@apache.org.
[FLINK-5427] [docs] Fix code example in event_timestamps_watermarks.md
This closes #3082
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1750b0dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1750b0dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1750b0dd
Branch: refs/heads/release-1.2
Commit: 1750b0dda8c243a215dcd558baf36ea1053a4310
Parents: 758ea79
Author: fengyelei 00406569 <fe...@huawei.com>
Authored: Mon Jan 9 18:37:59 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100
----------------------------------------------------------------------
docs/dev/event_timestamps_watermarks.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1750b0dd/docs/dev/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md
index 8d152df..a16aa55 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -304,7 +304,7 @@ public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
- return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
+ return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
{% endhighlight %}
@@ -318,7 +318,7 @@ class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
- if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
+ if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
{% endhighlight %}
[4/5] flink git commit: [FLINK-5364] [security] Rework JAAS
configuration to support user-supplied entries
Posted by se...@apache.org.
[FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries
Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055
This closes #3057
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00193f7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00193f7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00193f7e
Branch: refs/heads/release-1.2
Commit: 00193f7e238340cc18c57a44c7e6377432839373
Parents: 1750b0d
Author: wrighe3 <er...@emc.com>
Authored: Tue Dec 20 01:07:38 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 19:06:10 2017 +0100
----------------------------------------------------------------------
docs/internals/flink_security.md | 133 +++++---
docs/setup/config.md | 55 +++-
.../connectors/fs/RollingSinkSecuredITCase.java | 8 +-
.../flink/configuration/ConfigConstants.java | 14 -
.../configuration/HighAvailabilityOptions.java | 8 -
.../flink/configuration/SecurityOptions.java | 62 ++++
flink-dist/src/main/resources/flink-conf.yaml | 21 +-
.../java/hadoop/mapred/utils/HadoopUtils.java | 21 ++
.../MesosApplicationMasterRunner.java | 2 -
.../MesosTaskManagerRunner.java | 2 -
.../overlays/KeytabOverlay.java | 14 +-
.../runtime/security/DynamicConfiguration.java | 111 +++++++
.../runtime/security/JaasConfiguration.java | 160 ---------
.../flink/runtime/security/KerberosUtils.java | 125 +++++++
.../flink/runtime/security/SecurityUtils.java | 322 ++++++++-----------
.../runtime/security/modules/HadoopModule.java | 119 +++++++
.../runtime/security/modules/JaasModule.java | 146 +++++++++
.../security/modules/SecurityModule.java | 59 ++++
.../security/modules/ZooKeeperModule.java | 76 +++++
.../src/main/resources/flink-jaas.conf | 9 +-
.../overlays/KeytabOverlayTest.java | 5 +-
.../runtime/security/JaasConfigurationTest.java | 52 ---
.../runtime/security/KerberosUtilsTest.java | 48 +++
.../runtime/security/SecurityUtilsTest.java | 105 +++---
.../flink/test/util/SecureTestEnvironment.java | 48 +--
.../test/util/TestingJaasConfiguration.java | 106 ------
.../flink/test/util/TestingSecurityContext.java | 38 +--
.../yarn/YARNSessionFIFOSecuredITCase.java | 10 +-
.../org/apache/flink/yarn/YarnTestBase.java | 5 +-
.../yarn/AbstractYarnClusterDescriptor.java | 26 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 25 +-
.../flink/yarn/YarnTaskManagerRunner.java | 21 +-
32 files changed, 1182 insertions(+), 774 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
index 846273b..a83f3b9 100644
--- a/docs/internals/flink_security.md
+++ b/docs/internals/flink_security.md
@@ -24,64 +24,123 @@ specific language governing permissions and limitations
under the License.
-->
-This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN)
-and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
+filesystems, connectors, and state backends.
## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase)
-The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario,
-streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure
-data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the
-context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
+data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration. Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI.
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period
-of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication:
-- Kafka (0.9)
+- Kafka (0.9+)
- HDFS
+- HBase
- ZooKeeper
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup. The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
-Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login
-module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is
-instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
-It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled
-then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
-## Security Configurations
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
-- `security.principal`: User principal name that the Flink cluster should run as.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
-The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+ZooKeeper-related configuration overrides:
-## Standalone Mode:
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
Steps to run a secure Flink cluster in standalone/cluster mode:
-- Add security configurations to Flink configuration file (on all cluster nodes)
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
-- Deploy Flink cluster using cluster start/stop scripts or CLI
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
-## Yarn Mode:
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
-Steps to run secure Flink cluster in Yarn mode:
-- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI)
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
-- Deploy Flink cluster using CLI
+#### Using `kinit` (YARN only)
-In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
-Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a>
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it. The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
-## Token Renewal
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
-UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
\ No newline at end of file
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab. In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 512c08a..3ef4ffd 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -96,37 +96,58 @@ These options are useful for debugging a Flink application for memory and garbag
- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
-### Kerberos
+### Kerberos-based Security
-Flink supports Kerberos authentication for the following services
+Flink supports Kerberos authentication for the following services:
-+ Hadoop Components: such as HDFS, YARN, or HBase.
++ Hadoop Components (such as HDFS, YARN, or HBase)
+ Kafka Connectors (version 0.9+)
-+ Zookeeper Server/Client
++ Zookeeper
-Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
-
-**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
+**Kerberos is supported only in Hadoop version 2.6.1 and above. All
other versions have critical bugs which might fail the Flink job
unexpectedly.**
-**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+Configuring Flink for Kerberos security involves three aspects:
+
+1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
+2. Making the Kerberos credential available to components and connectors as needed
+3. Configuring the component and/or connector to use Kerberos authentication
+
+To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options,
+or login using `kinit` before starting the cluster.
+
+It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues. If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token lifetime.
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
+If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components.
-While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs.
+Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts.
-If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option.
+
+Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication. For the Kafka connector,
+use the following properties:
+
+```
+security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+sasl.kerberos.service.name=kafka
+```
-> Keytab (security principal and keytab can be configured through Flink configuration file)
-- `security.keytab`: Path to Keytab file
-- `security.principal`: Principal associated with the keytab
+Flink provides some additional options to configure ZooKeeper security:
-Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`).
-For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
### Other
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index eb12d07..fa46fc7 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment;
@@ -116,13 +117,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
populateSecureConfigurations();
Configuration flinkConfig = new Configuration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
SecureTestEnvironment.getTestKeytab());
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());
- SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
- ctx.setHadoopConfiguration(conf);
+ SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf);
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index eabb754..fc389e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1395,20 +1395,6 @@ public final class ConfigConstants {
/** The environment variable name which contains the Flink installation root directory */
public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
- // -------------------------------- Security -------------------------------
-
- /**
- * The config parameter defining security credentials required
- * for securing Flink cluster.
- */
-
- /** Keytab file key name to be used in flink configuration file */
- public static final String SECURITY_KEYTAB_KEY = "security.keytab";
-
- /** Kerberos security principal key name to be used in flink configuration file */
- public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
-
-
/**
* Not instantiable.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 1ee988a..4792eba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,14 +124,6 @@ public class HighAvailabilityOptions {
.defaultValue(3)
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
- public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
- key("zookeeper.sasl.disable")
- .defaultValue(true);
-
- public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
- key("zookeeper.sasl.service-name")
- .noDefaultValue();
-
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
new file mode 100644
index 0000000..67d101d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to security.
+ */
+public class SecurityOptions {
+
+ // ------------------------------------------------------------------------
+ // Kerberos Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
+ key("security.kerberos.login.principal")
+ .noDefaultValue()
+ .withDeprecatedKeys("security.principal");
+
+ public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
+ key("security.kerberos.login.keytab")
+ .noDefaultValue()
+ .withDeprecatedKeys("security.keytab");
+
+ public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE =
+ key("security.kerberos.login.use-ticket-cache")
+ .defaultValue(true);
+
+ public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS =
+ key("security.kerberos.login.contexts")
+ .noDefaultValue();
+
+
+ // ------------------------------------------------------------------------
+ // ZooKeeper Security Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
+ key("zookeeper.sasl.service-name")
+ .defaultValue("zookeeper");
+
+ public static final ConfigOption<String> ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME =
+ key("zookeeper.sasl.login-context-name")
+ .defaultValue("Client");
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index c650cfe..f759db6 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -162,21 +162,24 @@ jobmanager.web.port: 8081
# Flink Cluster Security Configuration (optional configuration)
#==============================================================================
-# Kerberos security for the connectors can be enabled by providing below configurations
-# Security works in two modes - keytab/principal combination or using the Kerberos token cache
-# If keytab and principal are not provided, token cache (manual kinit) will be used
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
-#security.keytab: /path/to/kerberos/keytab
-#security.principal: flink-user
+#security.kerberos.login.keytab: /path/to/kerberos/keytab
+#security.kerberos.login.principal: flink-user
+#security.kerberos.login.use-ticket-cache: true
+
+#security.kerberos.login.contexts: Client,KafkaClient
#==============================================================================
# ZK Security Configuration (optional configuration)
#==============================================================================
-# Below configurations are applicable if ZK quorum is configured for Kerberos security
-# SASL authentication is disabled by default and can be enabled by changig the value to false
-#
-# zookeeper.sasl.disable: true
+# Below configurations are applicable if ZK ensemble is configured for security
# Override below configuration to provide custom ZK service name if configured
#
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 7c41eaf..da8244f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -21,17 +21,22 @@ package org.apache.flink.api.java.hadoop.mapred.utils;
import java.io.File;
import java.lang.reflect.Constructor;
+import java.util.Collection;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +48,8 @@ public final class HadoopUtils {
private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+ private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
/**
* Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
*/
@@ -163,6 +170,20 @@ public final class HadoopUtils {
}
/**
+ * Indicates whether the current user has an HDFS delegation token.
+ */
+ public static boolean hasHDFSDelegationToken() throws Exception {
+ UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+ Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+ for (Token<? extends TokenIdentifier> token : usrTok) {
+ if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Private constructor to prevent instantiation.
*/
private HadoopUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 4b9bd82..689c26a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -27,7 +27,6 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -168,7 +167,6 @@ public class MesosApplicationMasterRunner {
// configure security
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
- sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
SecurityUtils.install(sc);
// run the actual work in the installed security context
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 75b5043..206c71b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -118,7 +117,6 @@ public class MesosTaskManagerRunner {
// Run the TM in the security context
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
- sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
SecurityUtils.install(sc);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
index 7fe5b3e..271b32d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.clusterframework.overlays;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.slf4j.Logger;
@@ -34,7 +34,7 @@ import java.io.IOException;
* Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
*
* The folloowing Flink configuration entries are updated:
- * - security.keytab
+ * - security.kerberos.login.keytab
*/
public class KeytabOverlay extends AbstractContainerOverlay {
@@ -60,7 +60,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
.setDest(TARGET_PATH)
.setCachable(false)
.build());
- container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath());
+ container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, TARGET_PATH.getPath());
}
}
@@ -69,7 +69,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
}
/**
- * A builder for the {@link HadoopUserOverlay}.
+ * A builder for the {@link KeytabOverlay}.
*/
public static class Builder {
@@ -79,15 +79,15 @@ public class KeytabOverlay extends AbstractContainerOverlay {
* Configures the overlay using the current environment (and global configuration).
*
* The following Flink configuration settings are checked for a keytab:
- * - security.keytab
+ * - security.kerberos.login.keytab
*/
public Builder fromEnvironment(Configuration globalConfiguration) {
- String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+ String keytab = globalConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if(keytab != null) {
keytabPath = new File(keytab);
if(!keytabPath.exists()) {
throw new IllegalStateException("Invalid configuration for " +
- ConfigConstants.SECURITY_KEYTAB_KEY +
+ SecurityOptions.KERBEROS_LOGIN_KEYTAB +
"; '" + keytab + "' not found.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
new file mode 100644
index 0000000..6af4f23
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A dynamic JAAS configuration.
+ *
+ * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
+ * an (optional) underlying configuration. Entries from the underlying configuration take
+ * precedence over dynamic entries.
+ */
+public class DynamicConfiguration extends Configuration {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class);
+
+ private final Configuration delegate;
+
+ private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
+
+ /**
+ * Create a dynamic configuration.
+ * @param delegate an underlying configuration to delegate to, or null.
+ */
+ public DynamicConfiguration(@Nullable Configuration delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Add entries for the given application name.
+ */
+ public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
+ final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+ final AppConfigurationEntry[] updated;
+ if(existing == null) {
+ updated = Arrays.copyOf(entry, entry.length);
+ }
+ else {
+ updated = merge(existing, entry);
+ }
+ dynamicEntries.put(name, updated);
+ }
+
+ /**
+ * Retrieve the AppConfigurationEntries for the specified <i>name</i>
+ * from this Configuration.
+ *
+ * <p>
+ *
+ * @param name the name used to index the Configuration.
+ *
+ * @return an array of AppConfigurationEntries for the specified <i>name</i>
+ * from this Configuration, or null if there are no entries
+ * for the specified <i>name</i>
+ */
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ AppConfigurationEntry[] entry = null;
+ if(delegate != null) {
+ entry = delegate.getAppConfigurationEntry(name);
+ }
+ final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+ if(existing != null) {
+ if(entry != null) {
+ entry = merge(entry, existing);
+ }
+ else {
+ entry = Arrays.copyOf(existing, existing.length);
+ }
+ }
+ return entry;
+ }
+
+ private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) {
+ AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length);
+ System.arraycopy(b, 0, merged, a.length, b.length);
+ return merged;
+ }
+
+ @Override
+ public void refresh() {
+ if(delegate != null) {
+ delegate.refresh();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
deleted file mode 100644
index c4527dd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * JAAS configuration provider object that provides default LoginModule for various connectors that supports
- * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class.
- *
- * Different connectors uses different login module name to implement JAAS based authentication support.
- * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the
- * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right
- * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides
- * a standard lookup to get the login module entry for the JAAS based authentication to work.
- *
- * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate.
- *
- * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a>
- *
- */
-
-@Internal
-public class JaasConfiguration extends Configuration {
-
- private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
-
- public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
-
- public static final boolean IBM_JAVA;
-
- private static final Map<String, String> debugOptions = new HashMap<>();
-
- private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
-
- private static final Map<String, String> keytabKerberosOptions = new HashMap<>();
-
- private static final AppConfigurationEntry userKerberosAce;
-
- private AppConfigurationEntry keytabKerberosAce = null;
-
- static {
-
- IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
-
- if(LOG.isDebugEnabled()) {
- debugOptions.put("debug", "true");
- }
-
- if(IBM_JAVA) {
- kerberosCacheOptions.put("useDefaultCcache", "true");
- } else {
- kerberosCacheOptions.put("doNotPrompt", "true");
- kerberosCacheOptions.put("useTicketCache", "true");
- }
-
- String ticketCache = System.getenv("KRB5CCNAME");
- if(ticketCache != null) {
- if(IBM_JAVA) {
- System.setProperty("KRB5CCNAME", ticketCache);
- } else {
- kerberosCacheOptions.put("ticketCache", ticketCache);
- }
- }
-
- kerberosCacheOptions.put("renewTGT", "true");
- kerberosCacheOptions.putAll(debugOptions);
-
- userKerberosAce = new AppConfigurationEntry(
- KerberosUtil.getKrb5LoginModuleName(),
- AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
- kerberosCacheOptions);
-
- }
-
- protected JaasConfiguration(String keytab, String principal) {
-
- LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal);
-
- if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
- (!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){
- throw new RuntimeException("Both keytab and principal are required and cannot be empty");
- }
-
- if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
-
- if(IBM_JAVA) {
- keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
- keytabKerberosOptions.put("credsType", "both");
- } else {
- keytabKerberosOptions.put("keyTab", keytab);
- keytabKerberosOptions.put("doNotPrompt", "true");
- keytabKerberosOptions.put("useKeyTab", "true");
- keytabKerberosOptions.put("storeKey", "true");
- }
-
- keytabKerberosOptions.put("principal", principal);
- keytabKerberosOptions.put("refreshKrb5Config", "true");
- keytabKerberosOptions.putAll(debugOptions);
-
- keytabKerberosAce = new AppConfigurationEntry(
- KerberosUtil.getKrb5LoginModuleName(),
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- keytabKerberosOptions);
- }
- }
-
- public static Map<String, String> getKeytabKerberosOptions() {
- return keytabKerberosOptions;
- }
-
- private static String prependFileUri(String keytabPath) {
- File f = new File(keytabPath);
- return f.toURI().toString();
- }
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
- LOG.debug("JAAS configuration requested for the application entry: {}", applicationName);
-
- AppConfigurationEntry[] appConfigurationEntry;
-
- if(keytabKerberosAce != null) {
- appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce};
- } else {
- appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce};
- }
-
- return appConfigurationEntry;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
new file mode 100644
index 0000000..7ef9187
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ *
+ * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
+ *
+ * The implementation is inspired from Hadoop UGI class.
+ */
+@Internal
+public class KerberosUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+
+ private static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
+
+ private static final boolean IBM_JAVA;
+
+ private static final Map<String, String> debugOptions = new HashMap<>();
+
+ private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
+
+ private static final AppConfigurationEntry userKerberosAce;
+
+ static {
+
+ IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+ if(LOG.isDebugEnabled()) {
+ debugOptions.put("debug", "true");
+ }
+
+ if(IBM_JAVA) {
+ kerberosCacheOptions.put("useDefaultCcache", "true");
+ } else {
+ kerberosCacheOptions.put("doNotPrompt", "true");
+ kerberosCacheOptions.put("useTicketCache", "true");
+ }
+
+ String ticketCache = System.getenv("KRB5CCNAME");
+ if(ticketCache != null) {
+ if(IBM_JAVA) {
+ System.setProperty("KRB5CCNAME", ticketCache);
+ } else {
+ kerberosCacheOptions.put("ticketCache", ticketCache);
+ }
+ }
+
+ kerberosCacheOptions.put("renewTGT", "true");
+ kerberosCacheOptions.putAll(debugOptions);
+
+ userKerberosAce = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+ kerberosCacheOptions);
+
+ }
+
+ public static AppConfigurationEntry ticketCacheEntry() {
+ return userKerberosAce;
+ }
+
+ public static AppConfigurationEntry keytabEntry(String keytab, String principal) {
+
+ checkNotNull(keytab, "keytab");
+ checkNotNull(principal, "principal");
+
+ Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+ if(IBM_JAVA) {
+ keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
+ keytabKerberosOptions.put("credsType", "both");
+ } else {
+ keytabKerberosOptions.put("keyTab", keytab);
+ keytabKerberosOptions.put("doNotPrompt", "true");
+ keytabKerberosOptions.put("useKeyTab", "true");
+ keytabKerberosOptions.put("storeKey", "true");
+ }
+
+ keytabKerberosOptions.put("principal", principal);
+ keytabKerberosOptions.put("refreshKrb5Config", "true");
+ keytabKerberosOptions.putAll(debugOptions);
+
+ AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ keytabKerberosOptions);
+
+ return keytabKerberosAce;
+ }
+
+ private static String prependFileUri(String keytabPath) {
+ File f = new File(keytabPath);
+ return f.toURI().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d7fc6ff..d76e7a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,216 +18,162 @@
package org.apache.flink.runtime.security;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.runtime.security.modules.ZooKeeperModule;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.Subject;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/*
- * Utils for configuring security. The following security mechanism are supported:
+ * Utils for configuring security. The following security subsystems are supported:
*
* 1. Java Authentication and Authorization Service (JAAS)
* 2. Hadoop's User Group Information (UGI)
+ * 3. ZooKeeper's process-wide security settings.
*/
public class SecurityUtils {
private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
- public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
-
- public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
-
- private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
-
- private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
-
private static SecurityContext installedContext = new NoOpSecurityContext();
+ private static List<SecurityModule> installedModules = null;
+
public static SecurityContext getInstalledContext() { return installedContext; }
+ @VisibleForTesting
+ static List<SecurityModule> getInstalledModules() {
+ return installedModules;
+ }
+
/**
- * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
- * It creates the in-memory JAAS configuration object which will serve appropriate
- * ApplicationConfigurationEntry for the connector login module implementation that
- * authenticates Kerberos identity using SASL/JAAS based mechanism.
+ * Installs a process-wide security configuration.
+ *
+ * Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
*/
public static void install(SecurityConfiguration config) throws Exception {
- if (!config.securityIsEnabled()) {
- // do not perform any initialization if no Kerberos crendetails are provided
- return;
+ // install the security modules
+ List<SecurityModule> modules = new ArrayList<>();
+ try {
+ for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) {
+ SecurityModule module = moduleClass.newInstance();
+ module.install(config);
+ modules.add(module);
+ }
}
+ catch(Exception ex) {
+ throw new Exception("unable to establish the security context", ex);
+ }
+ installedModules = modules;
- // establish the JAAS config
- JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
- javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
- populateSystemSecurityProperties(config.flinkConf);
-
- // establish the UGI login user
- UserGroupInformation.setConfiguration(config.hadoopConf);
-
- // only configure Hadoop security if we have security enabled
- if (UserGroupInformation.isSecurityEnabled()) {
-
- final UserGroupInformation loginUser;
-
- if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
- String keytabPath = (new File(config.keytab)).getAbsolutePath();
-
- UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
- loginUser = UserGroupInformation.getLoginUser();
-
- // supplement with any available tokens
- String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (fileLocation != null) {
- /*
- * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
- * used in the context of reading the stored tokens from UGI.
- * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
- * loginUser.addCredentials(cred);
- */
- try {
- Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
- File.class, org.apache.hadoop.conf.Configuration.class);
- Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
- config.hadoopConf);
- Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
- Credentials.class);
- addCredentialsMethod.invoke(loginUser, cred);
- } catch (NoSuchMethodException e) {
- LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
- }
- }
- } else {
- // login with current user credentials (e.g. ticket cache)
+ // install a security context
+ // use the Hadoop login user as the subject of the installed security context
+ if (!(installedContext instanceof NoOpSecurityContext)) {
+ LOG.warn("overriding previous security context");
+ }
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ installedContext = new HadoopSecurityContext(loginUser);
+ }
+
+ static void uninstall() {
+ if(installedModules != null) {
+ for (SecurityModule module : Lists.reverse(installedModules)) {
try {
- //Use reflection API to get the login user object
- //UserGroupInformation.loginUserFromSubject(null);
- Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
- Subject subject = null;
- loginUserFromSubjectMethod.invoke(null, subject);
- } catch (NoSuchMethodException e) {
- LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+ module.uninstall();
}
-
- // note that the stored tokens are read automatically
- loginUser = UserGroupInformation.getLoginUser();
- }
-
- LOG.info("Hadoop user set to {}", loginUser.toString());
-
- boolean delegationToken = false;
- final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
- Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
- for (Token<? extends TokenIdentifier> token : usrTok) {
- final Text id = new Text(token.getIdentifier());
- LOG.debug("Found user token " + id + " with " + token);
- if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
- delegationToken = true;
+ catch(UnsupportedOperationException ignored) {
}
- }
-
- if (!loginUser.hasKerberosCredentials()) {
- //throw an error in non-yarn deployment if kerberos cache is not available
- if (!delegationToken) {
- LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
- throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+ catch(SecurityModule.SecurityInstallException e) {
+ LOG.warn("unable to uninstall a security module", e);
}
}
-
- if (!(installedContext instanceof NoOpSecurityContext)) {
- LOG.warn("overriding previous security context");
- }
-
- installedContext = new HadoopSecurityContext(loginUser);
+ installedModules = null;
}
- }
- static void clearContext() {
installedContext = new NoOpSecurityContext();
}
- /*
- * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
- * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
- * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
- * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
- * Kafka current code behavior.
+ /**
+ * The global security configuration.
+ *
+ * See {@link SecurityOptions} for corresponding configuration options.
*/
- private static void populateSystemSecurityProperties(Configuration configuration) {
- Preconditions.checkNotNull(configuration, "The supplied configuration was null");
+ public static class SecurityConfiguration {
- boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+ private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
+ Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class));
- if (disableSaslClient) {
- LOG.info("SASL client auth for ZK will be disabled");
- //SASL auth is disabled by default but will be enabled if specified in configuration
- System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
- return;
- }
+ private final List<Class<? extends SecurityModule>> securityModules;
- // load Jaas config file to initialize SASL
- final File jaasConfFile;
- try {
- Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
- InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
- Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
- jaasConfFile = jaasConfPath.toFile();
- jaasConfFile.deleteOnExit();
- jaasConfStream.close();
- } catch (IOException e) {
- throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
- "locate pseudo Jaas config provided with Flink", e);
- }
+ private final org.apache.hadoop.conf.Configuration hadoopConf;
- LOG.info("Enabling {} property with pseudo JAAS config file: {}",
- JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+ private final boolean useTicketCache;
- //ZK client module lookup the configuration to handle SASL.
- //https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
- System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
- System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
+ private final String keytab;
- String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
- if (!StringUtils.isBlank(zkSaslServiceName)) {
- LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
- System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
- }
+ private final String principal;
- }
+ private final List<String> loginContextNames;
- /**
- * Inputs for establishing the security context.
- */
- public static class SecurityConfiguration {
+ private final String zkServiceName;
- private Configuration flinkConf;
+ private final String zkLoginContextName;
- private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+ /**
+ * Create a security configuration from the global configuration.
+ * @param flinkConf the Flink global configuration.
+ */
+ public SecurityConfiguration(Configuration flinkConf) {
+ this(flinkConf, HadoopUtils.getHadoopConfiguration());
+ }
- private String keytab;
+ /**
+ * Create a security configuration from the global configuration.
+ * @param flinkConf the Flink global configuration.
+ * @param hadoopConf the Hadoop configuration.
+ */
+ public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
+ this(flinkConf, hadoopConf, DEFAULT_MODULES);
+ }
- private String principal;
+ /**
+ * Create a security configuration from the global configuration.
+ * @param flinkConf the Flink global configuration.
+ * @param hadoopConf the Hadoop configuration.
+ * @param securityModules the security modules to apply.
+ */
+ public SecurityConfiguration(Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ List<? extends Class<? extends SecurityModule>> securityModules) {
+ this.hadoopConf = checkNotNull(hadoopConf);
+ this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
+ this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+ this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+ this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
+ this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+ this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
+ this.securityModules = Collections.unmodifiableList(securityModules);
+
+ validate();
+ }
public String getKeytab() {
return keytab;
@@ -237,48 +183,50 @@ public class SecurityUtils {
return principal;
}
- public SecurityConfiguration(Configuration flinkConf) {
- this.flinkConf = flinkConf;
+ public boolean useTicketCache() {
+ return useTicketCache;
+ }
- String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
- String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
- validate(keytab, principal);
+ public org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+ return hadoopConf;
+ }
- this.keytab = keytab;
- this.principal = principal;
+ public List<Class<? extends SecurityModule>> getSecurityModules() {
+ return securityModules;
}
- public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
- this.hadoopConf = conf;
- return this;
+ public List<String> getLoginContextNames() {
+ return loginContextNames;
}
- private void validate(String keytab, String principal) {
- LOG.debug("keytab {} and principal {} .", keytab, principal);
+ public String getZooKeeperServiceName() {
+ return zkServiceName;
+ }
- if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
- !StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
- if(StringUtils.isBlank(keytab)) {
- LOG.warn("Keytab is null or empty");
- }
+ public String getZooKeeperLoginContextName() {
+ return zkLoginContextName;
+ }
+
+ private void validate() {
+ if(!StringUtils.isBlank(keytab)) {
+ // principal is required
if(StringUtils.isBlank(principal)) {
- LOG.warn("Principal is null or empty");
+ throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
}
- throw new RuntimeException("Requires both keytab and principal to be provided");
- }
- if(!StringUtils.isBlank(keytab)) {
+ // check the keytab is readable
File keytabFile = new File(keytab);
- if(!keytabFile.exists() || !keytabFile.isFile()) {
- LOG.warn("Not a valid keytab: {} file", keytab);
- throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+ if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
+ throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
}
}
-
}
- public boolean securityIsEnabled() {
- return keytab != null && principal != null;
+ private static List<String> parseList(String value) {
+ if(value == null) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(value.split(","));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
new file mode 100644
index 0000000..9344faf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Responsible for installing a Hadoop login user.
+ */
+public class HadoopModule implements SecurityModule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
+
+ UserGroupInformation loginUser;
+
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+ UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+
+ try {
+ if (UserGroupInformation.isSecurityEnabled() &&
+ !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
+ String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
+
+ UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
+
+ loginUser = UserGroupInformation.getLoginUser();
+
+ // supplement with any available tokens
+ String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (fileLocation != null) {
+ /*
+ * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+ * used in the context of reading the stored tokens from UGI.
+ * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+ * loginUser.addCredentials(cred);
+ */
+ try {
+ Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+ File.class, org.apache.hadoop.conf.Configuration.class);
+ Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+ securityConfig.getHadoopConfiguration());
+ Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+ Credentials.class);
+ addCredentialsMethod.invoke(loginUser, cred);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+ }
+ } else {
+ // login with current user credentials (e.g. ticket cache, OS login)
+ // note that the stored tokens are read automatically
+ try {
+ //Use reflection API to get the login user object
+ //UserGroupInformation.loginUserFromSubject(null);
+ Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+ loginUserFromSubjectMethod.invoke(null, (Subject) null);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+
+ loginUser = UserGroupInformation.getLoginUser();
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) {
+ // a delegation token is an adequate substitute in most cases
+ if (!HadoopUtils.hasHDFSDelegationToken()) {
+ LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials");
+ }
+ }
+ }
+
+ LOG.info("Hadoop user set to {}", loginUser);
+
+ } catch (Throwable ex) {
+ throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
+ }
+ }
+
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
new file mode 100644
index 0000000..f8b9bdf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * <p>
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * <p>
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations.
+ * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);
+
+ static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+ static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+ private String priorConfigFile;
+ private javax.security.auth.login.Configuration priorConfig;
+
+ private DynamicConfiguration currentConfig;
+
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+ // ensure that a config file is always defined, for compatibility with
+ // ZK and Kafka which check for the system property and existence of the file
+ priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+ if (priorConfigFile == null) {
+ File configFile = generateDefaultConfigFile();
+ System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
+ }
+
+ // read the JAAS configuration file
+ priorConfig = javax.security.auth.login.Configuration.getConfiguration();
+
+ // construct a dynamic JAAS configuration
+ currentConfig = new DynamicConfiguration(priorConfig);
+
+ // wire up the configured JAAS login contexts to use the krb5 entries
+ AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
+ if(krb5Entries != null) {
+ for (String app : securityConfig.getLoginContextNames()) {
+ currentConfig.addAppConfigurationEntry(app, krb5Entries);
+ }
+ }
+
+ javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+ }
+
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ if(priorConfigFile != null) {
+ System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
+ } else {
+ System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
+ }
+ javax.security.auth.login.Configuration.setConfiguration(priorConfig);
+ }
+
+ public DynamicConfiguration getCurrentConfiguration() {
+ return currentConfig;
+ }
+
+ private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
+
+ AppConfigurationEntry userKerberosAce = null;
+ if (securityConfig.useTicketCache()) {
+ userKerberosAce = KerberosUtils.ticketCacheEntry();
+ }
+ AppConfigurationEntry keytabKerberosAce = null;
+ if (securityConfig.getKeytab() != null) {
+ keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal());
+ }
+
+ AppConfigurationEntry[] appConfigurationEntry;
+ if (userKerberosAce != null && keytabKerberosAce != null) {
+ appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce, userKerberosAce};
+ } else if (keytabKerberosAce != null) {
+ appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce};
+ } else if (userKerberosAce != null) {
+ appConfigurationEntry = new AppConfigurationEntry[]{userKerberosAce};
+ } else {
+ return null;
+ }
+
+ return appConfigurationEntry;
+ }
+
+ /**
+ * Generate the default JAAS config file.
+ */
+ private static File generateDefaultConfigFile() {
+ final File jaasConfFile;
+ try {
+ Path jaasConfPath = Files.createTempFile("jaas-", ".conf");
+ try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) {
+ Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+ }
+ jaasConfFile = jaasConfPath.toFile();
+ jaasConfFile.deleteOnExit();
+ } catch (IOException e) {
+ throw new RuntimeException("unable to generate a JAAS configuration file", e);
+ }
+ return jaasConfFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
new file mode 100644
index 0000000..fbe1db9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+import java.security.GeneralSecurityException;
+
+/**
+ * An installable security module.
+ */
+public interface SecurityModule {
+
+ /**
+ * Install the security module.
+ *
+ * @param configuration the security configuration.
+ * @throws SecurityInstallException if the security module couldn't be installed.
+ */
+ void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException;
+
+ /**
+ * Uninstall the security module.
+ *
+ * @throws SecurityInstallException if the security module couldn't be uninstalled.
+ * @throws UnsupportedOperationException if the security module doesn't support uninstallation.
+ */
+ void uninstall() throws SecurityInstallException;
+
+ /**
+ * Indicates a problem with installing or uninstalling a security module.
+ */
+ class SecurityInstallException extends GeneralSecurityException {
+ private static final long serialVersionUID = 1L;
+
+ public SecurityInstallException(String msg) {
+ super(msg);
+ }
+
+ public SecurityInstallException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
new file mode 100644
index 0000000..c0ba4a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * Responsible for installing a process-wide ZooKeeper security configuration.
+ */
+public class ZooKeeperModule implements SecurityModule {
+
+ private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+ /**
+ * A system property for setting whether ZK uses SASL.
+ */
+ private static final String ZK_ENABLE_CLIENT_SASL = "zookeeper.sasl.client";
+
+ /**
+ * A system property for setting the expected ZooKeeper service name.
+ */
+ private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+ /**
+ * A system property for setting the login context name to use.
+ */
+ private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
+
+ private String priorServiceName;
+
+ private String priorLoginContextName;
+
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+
+ priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
+ if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
+ System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
+ }
+
+ priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
+ if (!"Client".equals(configuration.getZooKeeperLoginContextName())) {
+ System.setProperty(ZK_LOGIN_CONTEXT_NAME, configuration.getZooKeeperLoginContextName());
+ }
+ }
+
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ if(priorServiceName != null) {
+ System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
+ } else {
+ System.clearProperty(ZK_SASL_CLIENT_USERNAME);
+ }
+ if(priorLoginContextName != null) {
+ System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
+ } else {
+ System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
+ }
+ }
+
+}