You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/16 17:00:40 UTC
[7/7] flink git commit: [FLINK-5350] don't overwrite an existing JAAS
config
[FLINK-5350] don't overwrite an existing JAAS config
Users may want to use SASL/PLAIN https://tools.ietf.org/html/rfc4616
without Kerberos enabled.
Skip security configuration if no Kerberos credentials are available.
This closes #3017.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0506a63c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0506a63c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0506a63c
Branch: refs/heads/master
Commit: 0506a63c8a7e50a0eaf66cd0bbec42e2fac5017c
Parents: becd270
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Dec 15 15:29:21 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100
----------------------------------------------------------------------
.../runtime/security/HadoopSecurityContext.java | 5 ++--
.../flink/runtime/security/SecurityUtils.java | 30 ++++++++++++++------
.../runtime/security/SecurityUtilsTest.java | 23 +++++++++++++++
3 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
index ea6e5e3..c70f00b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
@@ -25,9 +25,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
/*
- * Process-wide security context object which initializes UGI with appropriate security credentials and also it
- * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
- * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+ * Hadoop security context which runs a Callable with the previously
+ * initialized UGI and appropriate security credentials.
*/
class HadoopSecurityContext implements SecurityContext {
http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 7416cc6..d7fc6ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -64,12 +64,16 @@ public class SecurityUtils {
public static SecurityContext getInstalledContext() { return installedContext; }
/**
- * Performs a static initialization of the JAAS and Hadoop UGI security mechanism
+ * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
+ * It creates the in-memory JAAS configuration object which will serve appropriate
+ * ApplicationConfigurationEntry for the connector login module implementation that
+ * authenticates Kerberos identity using SASL/JAAS based mechanism.
*/
public static void install(SecurityConfiguration config) throws Exception {
- if (!(installedContext instanceof NoOpSecurityContext)) {
- LOG.warn("overriding previous security context");
+ if (!config.securityIsEnabled()) {
+ // do not perform any initialization if no Kerberos crendetails are provided
+ return;
}
// establish the JAAS config
@@ -151,10 +155,18 @@ public class SecurityUtils {
}
}
+ if (!(installedContext instanceof NoOpSecurityContext)) {
+ LOG.warn("overriding previous security context");
+ }
+
installedContext = new HadoopSecurityContext(loginUser);
}
}
+ static void clearContext() {
+ installedContext = new NoOpSecurityContext();
+ }
+
/*
* This method configures some of the system properties that are require for ZK and Kafka SASL authentication
* See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
@@ -163,11 +175,7 @@ public class SecurityUtils {
* Kafka current code behavior.
*/
private static void populateSystemSecurityProperties(Configuration configuration) {
- Preconditions.checkNotNull(configuration, "The supplied configuation was null");
-
- //required to be empty for Kafka but we will override the property
- //with pseudo JAAS configuration file if SASL auth is enabled for ZK
- System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+ Preconditions.checkNotNull(configuration, "The supplied configuration was null");
boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
@@ -203,7 +211,7 @@ public class SecurityUtils {
String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
if (!StringUtils.isBlank(zkSaslServiceName)) {
LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
- System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+ System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
}
}
@@ -268,6 +276,10 @@ public class SecurityUtils {
}
}
+
+ public boolean securityIsEnabled() {
+ return keytab != null && principal != null;
+ }
}
// Just a util, shouldn't be instantiated.
http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index ecb89e0..1d38899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.runtime.security;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Test;
import java.lang.reflect.Method;
@@ -31,6 +33,12 @@ import static org.junit.Assert.fail;
*/
public class SecurityUtilsTest {
+ @AfterClass
+ public static void afterClass() {
+ SecurityUtils.clearContext();
+ System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+ }
+
@Test
public void testCreateInsecureHadoopCtx() {
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
@@ -51,6 +59,21 @@ public class SecurityUtilsTest {
}
}
+ @Test
+ /**
+ * The Jaas configuration file provided should not be overridden.
+ */
+ public void testJaasPropertyOverride() throws Exception {
+ String confFile = "jaas.conf";
+ System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+
+ SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration()));
+
+ Assert.assertEquals(
+ confFile,
+ System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+ }
+
private String getOSUserName() throws Exception {
String userName = "";