You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/11 20:19:45 UTC
[7/8] flink git commit: [FLINK-5364] [security] Rework JAAS
configuration to support user-supplied entries
[FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries
Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055
This closes #3057
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc3a778c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc3a778c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc3a778c
Branch: refs/heads/master
Commit: fc3a778c0cafe1adc9efbd8796a8bd64122e4ad2
Parents: 03b62ae
Author: wrighe3 <er...@emc.com>
Authored: Tue Dec 20 01:07:38 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 11 20:17:44 2017 +0100
----------------------------------------------------------------------
docs/internals/flink_security.md | 133 +++++---
docs/setup/config.md | 55 +++-
.../connectors/fs/RollingSinkSecuredITCase.java | 8 +-
.../flink/configuration/ConfigConstants.java | 14 -
.../configuration/HighAvailabilityOptions.java | 8 -
.../flink/configuration/SecurityOptions.java | 62 ++++
flink-dist/src/main/resources/flink-conf.yaml | 21 +-
.../java/hadoop/mapred/utils/HadoopUtils.java | 21 ++
.../MesosApplicationMasterRunner.java | 2 -
.../MesosTaskManagerRunner.java | 2 -
.../overlays/KeytabOverlay.java | 14 +-
.../runtime/security/DynamicConfiguration.java | 111 +++++++
.../runtime/security/JaasConfiguration.java | 160 ---------
.../flink/runtime/security/KerberosUtils.java | 125 +++++++
.../flink/runtime/security/SecurityUtils.java | 322 ++++++++-----------
.../runtime/security/modules/HadoopModule.java | 119 +++++++
.../runtime/security/modules/JaasModule.java | 146 +++++++++
.../security/modules/SecurityModule.java | 59 ++++
.../security/modules/ZooKeeperModule.java | 76 +++++
.../src/main/resources/flink-jaas.conf | 9 +-
.../overlays/KeytabOverlayTest.java | 5 +-
.../runtime/security/JaasConfigurationTest.java | 52 ---
.../runtime/security/KerberosUtilsTest.java | 48 +++
.../runtime/security/SecurityUtilsTest.java | 105 +++---
.../flink/test/util/SecureTestEnvironment.java | 48 +--
.../test/util/TestingJaasConfiguration.java | 106 ------
.../flink/test/util/TestingSecurityContext.java | 38 +--
.../yarn/YARNSessionFIFOSecuredITCase.java | 10 +-
.../org/apache/flink/yarn/YarnTestBase.java | 5 +-
.../yarn/AbstractYarnClusterDescriptor.java | 26 +-
...bstractYarnFlinkApplicationMasterRunner.java | 25 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 31 +-
.../flink/yarn/YarnTaskExecutorRunner.java | 25 +-
.../flink/yarn/YarnTaskManagerRunner.java | 25 +-
34 files changed, 1219 insertions(+), 797 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
index 846273b..a83f3b9 100644
--- a/docs/internals/flink_security.md
+++ b/docs/internals/flink_security.md
@@ -24,64 +24,123 @@ specific language governing permissions and limitations
under the License.
-->
-This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN)
-and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
+filesystems, connectors, and state backends.
## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase)
-The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario,
-streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure
-data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the
-context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
+data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration. Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI.
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period
-of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication:
-- Kafka (0.9)
+- Kafka (0.9+)
- HDFS
+- HBase
- ZooKeeper
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup. The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
-Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login
-module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is
-instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
-It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled
-then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
-## Security Configurations
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
-- `security.principal`: User principal name that the Flink cluster should run as.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
-The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+ZooKeeper-related configuration overrides:
-## Standalone Mode:
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
Steps to run a secure Flink cluster in standalone/cluster mode:
-- Add security configurations to Flink configuration file (on all cluster nodes)
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
-- Deploy Flink cluster using cluster start/stop scripts or CLI
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
-## Yarn Mode:
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
-Steps to run secure Flink cluster in Yarn mode:
-- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI)
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
-- Deploy Flink cluster using CLI
+#### Using `kinit` (YARN only)
-In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
-Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a>
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it. The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
-## Token Renewal
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
-UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
\ No newline at end of file
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab. In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 724fe33..1b2be8a 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -96,37 +96,58 @@ These options are useful for debugging a Flink application for memory and garbag
- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
-### Kerberos
+### Kerberos-based Security
-Flink supports Kerberos authentication for the following services
+Flink supports Kerberos authentication for the following services:
-+ Hadoop Components: such as HDFS, YARN, or HBase.
++ Hadoop Components (such as HDFS, YARN, or HBase)
+ Kafka Connectors (version 0.9+)
-+ Zookeeper Server/Client
++ Zookeeper
-Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
-
-**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
+**Kerberos is supported only in Hadoop version 2.6.1 and above. All
other versions have critical bugs which might fail the Flink job
unexpectedly.**
-**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+Configuring Flink for Kerberos security involves three aspects:
+
+1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
+2. Making the Kerberos credential available to components and connectors as needed
+3. Configuring the component and/or connector to use Kerberos authentication
+
+To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options,
+or login using `kinit` before starting the cluster.
+
+It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues. If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token lifetime.
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
+If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components.
-While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs.
+Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts.
-If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option.
+
+Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication. For the Kafka connector,
+use the following properties:
+
+```
+security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+sasl.kerberos.service.name=kafka
+```
-> Keytab (security principal and keytab can be configured through Flink configuration file)
-- `security.keytab`: Path to Keytab file
-- `security.principal`: Principal associated with the keytab
+Flink provides some additional options to configure ZooKeeper security:
-Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`).
-For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
### Other
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index eb12d07..fa46fc7 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment;
@@ -116,13 +117,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
populateSecureConfigurations();
Configuration flinkConfig = new Configuration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
SecureTestEnvironment.getTestKeytab());
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());
- SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
- ctx.setHadoopConfiguration(conf);
+ SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf);
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index eabb754..fc389e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1395,20 +1395,6 @@ public final class ConfigConstants {
/** The environment variable name which contains the Flink installation root directory */
public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
- // -------------------------------- Security -------------------------------
-
- /**
- * The config parameter defining security credentials required
- * for securing Flink cluster.
- */
-
- /** Keytab file key name to be used in flink configuration file */
- public static final String SECURITY_KEYTAB_KEY = "security.keytab";
-
- /** Kerberos security principal key name to be used in flink configuration file */
- public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
-
-
/**
* Not instantiable.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 1ee988a..4792eba 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,14 +124,6 @@ public class HighAvailabilityOptions {
.defaultValue(3)
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
- public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
- key("zookeeper.sasl.disable")
- .defaultValue(true);
-
- public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
- key("zookeeper.sasl.service-name")
- .noDefaultValue();
-
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
new file mode 100644
index 0000000..67d101d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to security.
+ */
+public class SecurityOptions {
+
+ // ------------------------------------------------------------------------
+ // Kerberos Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
+ key("security.kerberos.login.principal")
+ .noDefaultValue()
+ .withDeprecatedKeys("security.principal");
+
+ public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
+ key("security.kerberos.login.keytab")
+ .noDefaultValue()
+ .withDeprecatedKeys("security.keytab");
+
+ public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE =
+ key("security.kerberos.login.use-ticket-cache")
+ .defaultValue(true);
+
+ public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS =
+ key("security.kerberos.login.contexts")
+ .noDefaultValue();
+
+
+ // ------------------------------------------------------------------------
+ // ZooKeeper Security Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
+ key("zookeeper.sasl.service-name")
+ .defaultValue("zookeeper");
+
+ public static final ConfigOption<String> ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME =
+ key("zookeeper.sasl.login-context-name")
+ .defaultValue("Client");
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index c650cfe..f759db6 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -162,21 +162,24 @@ jobmanager.web.port: 8081
# Flink Cluster Security Configuration (optional configuration)
#==============================================================================
-# Kerberos security for the connectors can be enabled by providing below configurations
-# Security works in two modes - keytab/principal combination or using the Kerberos token cache
-# If keytab and principal are not provided, token cache (manual kinit) will be used
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
-#security.keytab: /path/to/kerberos/keytab
-#security.principal: flink-user
+#security.kerberos.login.keytab: /path/to/kerberos/keytab
+#security.kerberos.login.principal: flink-user
+#security.kerberos.login.use-ticket-cache: true
+
+#security.kerberos.login.contexts: Client,KafkaClient
#==============================================================================
# ZK Security Configuration (optional configuration)
#==============================================================================
-# Below configurations are applicable if ZK quorum is configured for Kerberos security
-# SASL authentication is disabled by default and can be enabled by changig the value to false
-#
-# zookeeper.sasl.disable: true
+# Below configurations are applicable if ZK ensemble is configured for security
# Override below configuration to provide custom ZK service name if configured
#
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 7c41eaf..da8244f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -21,17 +21,22 @@ package org.apache.flink.api.java.hadoop.mapred.utils;
import java.io.File;
import java.lang.reflect.Constructor;
+import java.util.Collection;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +48,8 @@ public final class HadoopUtils {
private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+ private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
/**
* Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
*/
@@ -163,6 +170,20 @@ public final class HadoopUtils {
}
/**
+ * Indicates whether the current user has an HDFS delegation token.
+ */
+ public static boolean hasHDFSDelegationToken() throws Exception {
+ UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+ Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+ for (Token<? extends TokenIdentifier> token : usrTok) {
+ if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Private constructor to prevent instantiation.
*/
private HadoopUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 4b9bd82..689c26a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -27,7 +27,6 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -168,7 +167,6 @@ public class MesosApplicationMasterRunner {
// configure security
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
- sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
SecurityUtils.install(sc);
// run the actual work in the installed security context
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 75b5043..206c71b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -118,7 +117,6 @@ public class MesosTaskManagerRunner {
// Run the TM in the security context
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
- sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
SecurityUtils.install(sc);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
index 7fe5b3e..271b32d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.clusterframework.overlays;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.slf4j.Logger;
@@ -34,7 +34,7 @@ import java.io.IOException;
* Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
*
* The folloowing Flink configuration entries are updated:
- * - security.keytab
+ * - security.kerberos.login.keytab
*/
public class KeytabOverlay extends AbstractContainerOverlay {
@@ -60,7 +60,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
.setDest(TARGET_PATH)
.setCachable(false)
.build());
- container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath());
+ container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, TARGET_PATH.getPath());
}
}
@@ -69,7 +69,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
}
/**
- * A builder for the {@link HadoopUserOverlay}.
+ * A builder for the {@link KeytabOverlay}.
*/
public static class Builder {
@@ -79,15 +79,15 @@ public class KeytabOverlay extends AbstractContainerOverlay {
* Configures the overlay using the current environment (and global configuration).
*
* The following Flink configuration settings are checked for a keytab:
- * - security.keytab
+ * - security.kerberos.login.keytab
*/
public Builder fromEnvironment(Configuration globalConfiguration) {
- String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+ String keytab = globalConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if(keytab != null) {
keytabPath = new File(keytab);
if(!keytabPath.exists()) {
throw new IllegalStateException("Invalid configuration for " +
- ConfigConstants.SECURITY_KEYTAB_KEY +
+ SecurityOptions.KERBEROS_LOGIN_KEYTAB +
"; '" + keytab + "' not found.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
new file mode 100644
index 0000000..6af4f23
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A dynamic JAAS configuration.
+ *
+ * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
+ * an (optional) underlying configuration. Entries from the underlying configuration take
+ * precedence over dynamic entries.
+ */
+public class DynamicConfiguration extends Configuration {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class);
+
+ private final Configuration delegate;
+
+ private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
+
+ /**
+ * Create a dynamic configuration.
+ * @param delegate an underlying configuration to delegate to, or null.
+ */
+ public DynamicConfiguration(@Nullable Configuration delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Add entries for the given application name.
+ */
+ public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
+ final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+ final AppConfigurationEntry[] updated;
+ if(existing == null) {
+ updated = Arrays.copyOf(entry, entry.length);
+ }
+ else {
+ updated = merge(existing, entry);
+ }
+ dynamicEntries.put(name, updated);
+ }
+
+ /**
+ * Retrieve the AppConfigurationEntries for the specified <i>name</i>
+ * from this Configuration.
+ *
+ * <p>
+ *
+ * @param name the name used to index the Configuration.
+ *
+ * @return an array of AppConfigurationEntries for the specified <i>name</i>
+ * from this Configuration, or null if there are no entries
+ * for the specified <i>name</i>
+ */
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ AppConfigurationEntry[] entry = null;
+ if(delegate != null) {
+ entry = delegate.getAppConfigurationEntry(name);
+ }
+ final AppConfigurationEntry[] existing = dynamicEntries.get(name);
+ if(existing != null) {
+ if(entry != null) {
+ entry = merge(entry, existing);
+ }
+ else {
+ entry = Arrays.copyOf(existing, existing.length);
+ }
+ }
+ return entry;
+ }
+
+ private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) {
+ AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length);
+ System.arraycopy(b, 0, merged, a.length, b.length);
+ return merged;
+ }
+
+ @Override
+ public void refresh() {
+ if(delegate != null) {
+ delegate.refresh();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
deleted file mode 100644
index c4527dd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * JAAS configuration provider object that provides default LoginModule for various connectors that supports
- * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class.
- *
- * Different connectors uses different login module name to implement JAAS based authentication support.
- * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the
- * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right
- * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides
- * a standard lookup to get the login module entry for the JAAS based authentication to work.
- *
- * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate.
- *
- * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a>
- *
- */
-
-@Internal
-public class JaasConfiguration extends Configuration {
-
- private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
-
- public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
-
- public static final boolean IBM_JAVA;
-
- private static final Map<String, String> debugOptions = new HashMap<>();
-
- private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
-
- private static final Map<String, String> keytabKerberosOptions = new HashMap<>();
-
- private static final AppConfigurationEntry userKerberosAce;
-
- private AppConfigurationEntry keytabKerberosAce = null;
-
- static {
-
- IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
-
- if(LOG.isDebugEnabled()) {
- debugOptions.put("debug", "true");
- }
-
- if(IBM_JAVA) {
- kerberosCacheOptions.put("useDefaultCcache", "true");
- } else {
- kerberosCacheOptions.put("doNotPrompt", "true");
- kerberosCacheOptions.put("useTicketCache", "true");
- }
-
- String ticketCache = System.getenv("KRB5CCNAME");
- if(ticketCache != null) {
- if(IBM_JAVA) {
- System.setProperty("KRB5CCNAME", ticketCache);
- } else {
- kerberosCacheOptions.put("ticketCache", ticketCache);
- }
- }
-
- kerberosCacheOptions.put("renewTGT", "true");
- kerberosCacheOptions.putAll(debugOptions);
-
- userKerberosAce = new AppConfigurationEntry(
- KerberosUtil.getKrb5LoginModuleName(),
- AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
- kerberosCacheOptions);
-
- }
-
- protected JaasConfiguration(String keytab, String principal) {
-
- LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal);
-
- if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
- (!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){
- throw new RuntimeException("Both keytab and principal are required and cannot be empty");
- }
-
- if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
-
- if(IBM_JAVA) {
- keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
- keytabKerberosOptions.put("credsType", "both");
- } else {
- keytabKerberosOptions.put("keyTab", keytab);
- keytabKerberosOptions.put("doNotPrompt", "true");
- keytabKerberosOptions.put("useKeyTab", "true");
- keytabKerberosOptions.put("storeKey", "true");
- }
-
- keytabKerberosOptions.put("principal", principal);
- keytabKerberosOptions.put("refreshKrb5Config", "true");
- keytabKerberosOptions.putAll(debugOptions);
-
- keytabKerberosAce = new AppConfigurationEntry(
- KerberosUtil.getKrb5LoginModuleName(),
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- keytabKerberosOptions);
- }
- }
-
- public static Map<String, String> getKeytabKerberosOptions() {
- return keytabKerberosOptions;
- }
-
- private static String prependFileUri(String keytabPath) {
- File f = new File(keytabPath);
- return f.toURI().toString();
- }
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) {
-
- LOG.debug("JAAS configuration requested for the application entry: {}", applicationName);
-
- AppConfigurationEntry[] appConfigurationEntry;
-
- if(keytabKerberosAce != null) {
- appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce};
- } else {
- appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce};
- }
-
- return appConfigurationEntry;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
new file mode 100644
index 0000000..7ef9187
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ *
+ * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
+ *
+ * The implementation is inspired from Hadoop UGI class.
+ */
+@Internal
+public class KerberosUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+
+ private static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor");
+
+ private static final boolean IBM_JAVA;
+
+ private static final Map<String, String> debugOptions = new HashMap<>();
+
+ private static final Map<String, String> kerberosCacheOptions = new HashMap<>();
+
+ private static final AppConfigurationEntry userKerberosAce;
+
+ static {
+
+ IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+ if(LOG.isDebugEnabled()) {
+ debugOptions.put("debug", "true");
+ }
+
+ if(IBM_JAVA) {
+ kerberosCacheOptions.put("useDefaultCcache", "true");
+ } else {
+ kerberosCacheOptions.put("doNotPrompt", "true");
+ kerberosCacheOptions.put("useTicketCache", "true");
+ }
+
+ String ticketCache = System.getenv("KRB5CCNAME");
+ if(ticketCache != null) {
+ if(IBM_JAVA) {
+ System.setProperty("KRB5CCNAME", ticketCache);
+ } else {
+ kerberosCacheOptions.put("ticketCache", ticketCache);
+ }
+ }
+
+ kerberosCacheOptions.put("renewTGT", "true");
+ kerberosCacheOptions.putAll(debugOptions);
+
+ userKerberosAce = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+ kerberosCacheOptions);
+
+ }
+
+ public static AppConfigurationEntry ticketCacheEntry() {
+ return userKerberosAce;
+ }
+
+ public static AppConfigurationEntry keytabEntry(String keytab, String principal) {
+
+ checkNotNull(keytab, "keytab");
+ checkNotNull(principal, "principal");
+
+ Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+ if(IBM_JAVA) {
+ keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
+ keytabKerberosOptions.put("credsType", "both");
+ } else {
+ keytabKerberosOptions.put("keyTab", keytab);
+ keytabKerberosOptions.put("doNotPrompt", "true");
+ keytabKerberosOptions.put("useKeyTab", "true");
+ keytabKerberosOptions.put("storeKey", "true");
+ }
+
+ keytabKerberosOptions.put("principal", principal);
+ keytabKerberosOptions.put("refreshKrb5Config", "true");
+ keytabKerberosOptions.putAll(debugOptions);
+
+ AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ keytabKerberosOptions);
+
+ return keytabKerberosAce;
+ }
+
+ private static String prependFileUri(String keytabPath) {
+ File f = new File(keytabPath);
+ return f.toURI().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d7fc6ff..d76e7a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,216 +18,162 @@
package org.apache.flink.runtime.security;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.runtime.security.modules.ZooKeeperModule;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.Subject;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/*
- * Utils for configuring security. The following security mechanism are supported:
+ * Utils for configuring security. The following security subsystems are supported:
*
* 1. Java Authentication and Authorization Service (JAAS)
* 2. Hadoop's User Group Information (UGI)
+ * 3. ZooKeeper's process-wide security settings.
*/
public class SecurityUtils {
private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
- public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
-
- public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
-
- private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
-
- private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
-
private static SecurityContext installedContext = new NoOpSecurityContext();
+ private static List<SecurityModule> installedModules = null;
+
public static SecurityContext getInstalledContext() { return installedContext; }
+ @VisibleForTesting
+ static List<SecurityModule> getInstalledModules() {
+ return installedModules;
+ }
+
/**
- * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
- * It creates the in-memory JAAS configuration object which will serve appropriate
- * ApplicationConfigurationEntry for the connector login module implementation that
- * authenticates Kerberos identity using SASL/JAAS based mechanism.
+ * Installs a process-wide security configuration.
+ *
+ * Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
*/
public static void install(SecurityConfiguration config) throws Exception {
- if (!config.securityIsEnabled()) {
- // do not perform any initialization if no Kerberos crendetails are provided
- return;
+ // install the security modules
+ List<SecurityModule> modules = new ArrayList<>();
+ try {
+ for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) {
+ SecurityModule module = moduleClass.newInstance();
+ module.install(config);
+ modules.add(module);
+ }
}
+ catch(Exception ex) {
+ throw new Exception("unable to establish the security context", ex);
+ }
+ installedModules = modules;
- // establish the JAAS config
- JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
- javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
- populateSystemSecurityProperties(config.flinkConf);
-
- // establish the UGI login user
- UserGroupInformation.setConfiguration(config.hadoopConf);
-
- // only configure Hadoop security if we have security enabled
- if (UserGroupInformation.isSecurityEnabled()) {
-
- final UserGroupInformation loginUser;
-
- if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
- String keytabPath = (new File(config.keytab)).getAbsolutePath();
-
- UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
- loginUser = UserGroupInformation.getLoginUser();
-
- // supplement with any available tokens
- String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (fileLocation != null) {
- /*
- * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
- * used in the context of reading the stored tokens from UGI.
- * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
- * loginUser.addCredentials(cred);
- */
- try {
- Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
- File.class, org.apache.hadoop.conf.Configuration.class);
- Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
- config.hadoopConf);
- Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
- Credentials.class);
- addCredentialsMethod.invoke(loginUser, cred);
- } catch (NoSuchMethodException e) {
- LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
- }
- }
- } else {
- // login with current user credentials (e.g. ticket cache)
+ // install a security context
+ // use the Hadoop login user as the subject of the installed security context
+ if (!(installedContext instanceof NoOpSecurityContext)) {
+ LOG.warn("overriding previous security context");
+ }
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ installedContext = new HadoopSecurityContext(loginUser);
+ }
+
+ static void uninstall() {
+ if(installedModules != null) {
+ for (SecurityModule module : Lists.reverse(installedModules)) {
try {
- //Use reflection API to get the login user object
- //UserGroupInformation.loginUserFromSubject(null);
- Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
- Subject subject = null;
- loginUserFromSubjectMethod.invoke(null, subject);
- } catch (NoSuchMethodException e) {
- LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+ module.uninstall();
}
-
- // note that the stored tokens are read automatically
- loginUser = UserGroupInformation.getLoginUser();
- }
-
- LOG.info("Hadoop user set to {}", loginUser.toString());
-
- boolean delegationToken = false;
- final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
- Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
- for (Token<? extends TokenIdentifier> token : usrTok) {
- final Text id = new Text(token.getIdentifier());
- LOG.debug("Found user token " + id + " with " + token);
- if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
- delegationToken = true;
+ catch(UnsupportedOperationException ignored) {
}
- }
-
- if (!loginUser.hasKerberosCredentials()) {
- //throw an error in non-yarn deployment if kerberos cache is not available
- if (!delegationToken) {
- LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
- throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
+ catch(SecurityModule.SecurityInstallException e) {
+ LOG.warn("unable to uninstall a security module", e);
}
}
-
- if (!(installedContext instanceof NoOpSecurityContext)) {
- LOG.warn("overriding previous security context");
- }
-
- installedContext = new HadoopSecurityContext(loginUser);
+ installedModules = null;
}
- }
- static void clearContext() {
installedContext = new NoOpSecurityContext();
}
- /*
- * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
- * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
- * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
- * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and
- * Kafka current code behavior.
+ /**
+ * The global security configuration.
+ *
+ * See {@link SecurityOptions} for corresponding configuration options.
*/
- private static void populateSystemSecurityProperties(Configuration configuration) {
- Preconditions.checkNotNull(configuration, "The supplied configuration was null");
+ public static class SecurityConfiguration {
- boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+ private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
+ Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class));
- if (disableSaslClient) {
- LOG.info("SASL client auth for ZK will be disabled");
- //SASL auth is disabled by default but will be enabled if specified in configuration
- System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
- return;
- }
+ private final List<Class<? extends SecurityModule>> securityModules;
- // load Jaas config file to initialize SASL
- final File jaasConfFile;
- try {
- Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
- InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
- Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
- jaasConfFile = jaasConfPath.toFile();
- jaasConfFile.deleteOnExit();
- jaasConfStream.close();
- } catch (IOException e) {
- throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
- "locate pseudo Jaas config provided with Flink", e);
- }
+ private final org.apache.hadoop.conf.Configuration hadoopConf;
- LOG.info("Enabling {} property with pseudo JAAS config file: {}",
- JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+ private final boolean useTicketCache;
- //ZK client module lookup the configuration to handle SASL.
- //https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
- System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
- System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
+ private final String keytab;
- String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
- if (!StringUtils.isBlank(zkSaslServiceName)) {
- LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
- System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
- }
+ private final String principal;
- }
+ private final List<String> loginContextNames;
- /**
- * Inputs for establishing the security context.
- */
- public static class SecurityConfiguration {
+ private final String zkServiceName;
- private Configuration flinkConf;
+ private final String zkLoginContextName;
- private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+ /**
+ * Create a security configuration from the global configuration.
+ * @param flinkConf the Flink global configuration.
+ */
+ public SecurityConfiguration(Configuration flinkConf) {
+ this(flinkConf, HadoopUtils.getHadoopConfiguration());
+ }
- private String keytab;
+ /**
+ * Create a security configuration from the global configuration.
+ * @param flinkConf the Flink global configuration.
+ * @param hadoopConf the Hadoop configuration.
+ */
+ public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
+ this(flinkConf, hadoopConf, DEFAULT_MODULES);
+ }
- private String principal;
+ /**
+ * Create a security configuration from the global configuration.
+ * @param flinkConf the Flink global configuration.
+ * @param hadoopConf the Hadoop configuration.
+ * @param securityModules the security modules to apply.
+ */
+ public SecurityConfiguration(Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ List<? extends Class<? extends SecurityModule>> securityModules) {
+ this.hadoopConf = checkNotNull(hadoopConf);
+ this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
+ this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+ this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+ this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
+ this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+ this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
+ this.securityModules = Collections.unmodifiableList(securityModules);
+
+ validate();
+ }
public String getKeytab() {
return keytab;
@@ -237,48 +183,50 @@ public class SecurityUtils {
return principal;
}
- public SecurityConfiguration(Configuration flinkConf) {
- this.flinkConf = flinkConf;
+ public boolean useTicketCache() {
+ return useTicketCache;
+ }
- String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
- String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
- validate(keytab, principal);
+ public org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+ return hadoopConf;
+ }
- this.keytab = keytab;
- this.principal = principal;
+ public List<Class<? extends SecurityModule>> getSecurityModules() {
+ return securityModules;
}
- public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
- this.hadoopConf = conf;
- return this;
+ public List<String> getLoginContextNames() {
+ return loginContextNames;
}
- private void validate(String keytab, String principal) {
- LOG.debug("keytab {} and principal {} .", keytab, principal);
+ public String getZooKeeperServiceName() {
+ return zkServiceName;
+ }
- if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) ||
- !StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) {
- if(StringUtils.isBlank(keytab)) {
- LOG.warn("Keytab is null or empty");
- }
+ public String getZooKeeperLoginContextName() {
+ return zkLoginContextName;
+ }
+
+ private void validate() {
+ if(!StringUtils.isBlank(keytab)) {
+ // principal is required
if(StringUtils.isBlank(principal)) {
- LOG.warn("Principal is null or empty");
+ throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
}
- throw new RuntimeException("Requires both keytab and principal to be provided");
- }
- if(!StringUtils.isBlank(keytab)) {
+ // check the keytab is readable
File keytabFile = new File(keytab);
- if(!keytabFile.exists() || !keytabFile.isFile()) {
- LOG.warn("Not a valid keytab: {} file", keytab);
- throw new RuntimeException("Invalid keytab file: " + keytab + " passed");
+ if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
+ throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
}
}
-
}
- public boolean securityIsEnabled() {
- return keytab != null && principal != null;
+ private static List<String> parseList(String value) {
+ if(value == null) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(value.split(","));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
new file mode 100644
index 0000000..9344faf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Responsible for installing a Hadoop login user.
+ */
+public class HadoopModule implements SecurityModule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
+
+ UserGroupInformation loginUser;
+
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+ UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+
+ try {
+ if (UserGroupInformation.isSecurityEnabled() &&
+ !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
+ String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
+
+ UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
+
+ loginUser = UserGroupInformation.getLoginUser();
+
+ // supplement with any available tokens
+ String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (fileLocation != null) {
+ /*
+ * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+ * used in the context of reading the stored tokens from UGI.
+ * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+ * loginUser.addCredentials(cred);
+ */
+ try {
+ Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+ File.class, org.apache.hadoop.conf.Configuration.class);
+ Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+ securityConfig.getHadoopConfiguration());
+ Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
+ Credentials.class);
+ addCredentialsMethod.invoke(loginUser, cred);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+ }
+ } else {
+ // login with current user credentials (e.g. ticket cache, OS login)
+ // note that the stored tokens are read automatically
+ try {
+ //Use reflection API to get the login user object
+ //UserGroupInformation.loginUserFromSubject(null);
+ Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+ loginUserFromSubjectMethod.invoke(null, (Subject) null);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+
+ loginUser = UserGroupInformation.getLoginUser();
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) {
+ // a delegation token is an adequate substitute in most cases
+ if (!HadoopUtils.hasHDFSDelegationToken()) {
+ LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials");
+ }
+ }
+ }
+
+ LOG.info("Hadoop user set to {}", loginUser);
+
+ } catch (Throwable ex) {
+ throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
+ }
+ }
+
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
new file mode 100644
index 0000000..f8b9bdf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * <p>
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * <p>
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations.
+ * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);
+
+ static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+ static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+ private String priorConfigFile;
+ private javax.security.auth.login.Configuration priorConfig;
+
+ private DynamicConfiguration currentConfig;
+
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+
+ // ensure that a config file is always defined, for compatibility with
+ // ZK and Kafka which check for the system property and existence of the file
+ priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+ if (priorConfigFile == null) {
+ File configFile = generateDefaultConfigFile();
+ System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
+ }
+
+ // read the JAAS configuration file
+ priorConfig = javax.security.auth.login.Configuration.getConfiguration();
+
+ // construct a dynamic JAAS configuration
+ currentConfig = new DynamicConfiguration(priorConfig);
+
+ // wire up the configured JAAS login contexts to use the krb5 entries
+ AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
+ if(krb5Entries != null) {
+ for (String app : securityConfig.getLoginContextNames()) {
+ currentConfig.addAppConfigurationEntry(app, krb5Entries);
+ }
+ }
+
+ javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+ }
+
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ if(priorConfigFile != null) {
+ System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
+ } else {
+ System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
+ }
+ javax.security.auth.login.Configuration.setConfiguration(priorConfig);
+ }
+
+ public DynamicConfiguration getCurrentConfiguration() {
+ return currentConfig;
+ }
+
+ private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
+
+ AppConfigurationEntry userKerberosAce = null;
+ if (securityConfig.useTicketCache()) {
+ userKerberosAce = KerberosUtils.ticketCacheEntry();
+ }
+ AppConfigurationEntry keytabKerberosAce = null;
+ if (securityConfig.getKeytab() != null) {
+ keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal());
+ }
+
+ AppConfigurationEntry[] appConfigurationEntry;
+ if (userKerberosAce != null && keytabKerberosAce != null) {
+ appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce, userKerberosAce};
+ } else if (keytabKerberosAce != null) {
+ appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce};
+ } else if (userKerberosAce != null) {
+ appConfigurationEntry = new AppConfigurationEntry[]{userKerberosAce};
+ } else {
+ return null;
+ }
+
+ return appConfigurationEntry;
+ }
+
+ /**
+ * Generate the default JAAS config file.
+ */
+ private static File generateDefaultConfigFile() {
+ final File jaasConfFile;
+ try {
+ Path jaasConfPath = Files.createTempFile("jaas-", ".conf");
+ try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) {
+ Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+ }
+ jaasConfFile = jaasConfPath.toFile();
+ jaasConfFile.deleteOnExit();
+ } catch (IOException e) {
+ throw new RuntimeException("unable to generate a JAAS configuration file", e);
+ }
+ return jaasConfFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
new file mode 100644
index 0000000..fbe1db9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+import java.security.GeneralSecurityException;
+
+/**
+ * An installable security module.
+ */
+public interface SecurityModule {
+
+ /**
+ * Install the security module.
+ *
+ * @param configuration the security configuration.
+ * @throws SecurityInstallException if the security module couldn't be installed.
+ */
+ void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException;
+
+ /**
+ * Uninstall the security module.
+ *
+ * @throws SecurityInstallException if the security module couldn't be uninstalled.
+ * @throws UnsupportedOperationException if the security module doesn't support uninstallation.
+ */
+ void uninstall() throws SecurityInstallException;
+
+ /**
+ * Indicates a problem with installing or uninstalling a security module.
+ */
+ class SecurityInstallException extends GeneralSecurityException {
+ private static final long serialVersionUID = 1L;
+
+ public SecurityInstallException(String msg) {
+ super(msg);
+ }
+
+ public SecurityInstallException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc3a778c/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
new file mode 100644
index 0000000..c0ba4a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * Responsible for installing a process-wide ZooKeeper security configuration.
+ */
+public class ZooKeeperModule implements SecurityModule {
+
+ private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+ /**
+ * A system property for setting whether ZK uses SASL.
+ */
+ private static final String ZK_ENABLE_CLIENT_SASL = "zookeeper.sasl.client";
+
+ /**
+ * A system property for setting the expected ZooKeeper service name.
+ */
+ private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+ /**
+ * A system property for setting the login context name to use.
+ */
+ private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
+
+ private String priorServiceName;
+
+ private String priorLoginContextName;
+
+ @Override
+ public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+
+ priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
+ if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
+ System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
+ }
+
+ priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
+ if (!"Client".equals(configuration.getZooKeeperLoginContextName())) {
+ System.setProperty(ZK_LOGIN_CONTEXT_NAME, configuration.getZooKeeperLoginContextName());
+ }
+ }
+
+ @Override
+ public void uninstall() throws SecurityInstallException {
+ if(priorServiceName != null) {
+ System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
+ } else {
+ System.clearProperty(ZK_SASL_CLIENT_USERNAME);
+ }
+ if(priorLoginContextName != null) {
+ System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
+ } else {
+ System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
+ }
+ }
+
+}