You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/16 17:00:40 UTC

[7/7] flink git commit: [FLINK-5350] don't overwrite an existing JAAS config

[FLINK-5350] don't overwrite an existing JAAS config

Users may want to use SASL/PLAIN https://tools.ietf.org/html/rfc4616
without Kerberos enabled.

Skip security configuration if no Kerberos credentials are available.

This closes #3017.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0506a63c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0506a63c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0506a63c

Branch: refs/heads/master
Commit: 0506a63c8a7e50a0eaf66cd0bbec42e2fac5017c
Parents: becd270
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Dec 15 15:29:21 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 .../runtime/security/HadoopSecurityContext.java |  5 ++--
 .../flink/runtime/security/SecurityUtils.java   | 30 ++++++++++++++------
 .../runtime/security/SecurityUtilsTest.java     | 23 +++++++++++++++
 3 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
index ea6e5e3..c70f00b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
@@ -25,9 +25,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
 
 /*
- * Process-wide security context object which initializes UGI with appropriate security credentials and also it
- * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
- * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+ * Hadoop security context which runs a Callable with the previously
+ * initialized UGI and appropriate security credentials.
  */
 class HadoopSecurityContext implements SecurityContext {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 7416cc6..d7fc6ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -64,12 +64,16 @@ public class SecurityUtils {
 	public static SecurityContext getInstalledContext() { return installedContext; }
 
 	/**
-	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism
+	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
+	 * It creates the in-memory JAAS configuration object which will serve appropriate
+	 * ApplicationConfigurationEntry for the connector login module implementation that
+	 * authenticates Kerberos identity using SASL/JAAS based mechanism.
 	 */
 	public static void install(SecurityConfiguration config) throws Exception {
 
-		if (!(installedContext instanceof NoOpSecurityContext)) {
-			LOG.warn("overriding previous security context");
+		if (!config.securityIsEnabled()) {
+			// do not perform any initialization if no Kerberos crendetails are provided
+			return;
 		}
 
 		// establish the JAAS config
@@ -151,10 +155,18 @@ public class SecurityUtils {
 				}
 			}
 
+			if (!(installedContext instanceof NoOpSecurityContext)) {
+				LOG.warn("overriding previous security context");
+			}
+
 			installedContext = new HadoopSecurityContext(loginUser);
 		}
 	}
 
+	static void clearContext() {
+		installedContext = new NoOpSecurityContext();
+	}
+
 	/*
 	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
 	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
@@ -163,11 +175,7 @@ public class SecurityUtils {
 	 * Kafka current code behavior.
 	 */
 	private static void populateSystemSecurityProperties(Configuration configuration) {
-		Preconditions.checkNotNull(configuration, "The supplied configuation was null");
-
-		//required to be empty for Kafka but we will override the property
-		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+		Preconditions.checkNotNull(configuration, "The supplied configuration was null");
 
 		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
 
@@ -203,7 +211,7 @@ public class SecurityUtils {
 		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
 		if (!StringUtils.isBlank(zkSaslServiceName)) {
 			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
-			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
 		}
 
 	}
@@ -268,6 +276,10 @@ public class SecurityUtils {
 			}
 
 		}
+
+		public boolean securityIsEnabled() {
+			return keytab != null && principal != null;
+		}
 	}
 
 	// Just a util, shouldn't be instantiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index ecb89e0..1d38899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.runtime.security;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -31,6 +33,12 @@ import static org.junit.Assert.fail;
  */
 public class SecurityUtilsTest {
 
+	@AfterClass
+	public static void afterClass() {
+		SecurityUtils.clearContext();
+		System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+	}
+
 	@Test
 	public void testCreateInsecureHadoopCtx() {
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
@@ -51,6 +59,21 @@ public class SecurityUtilsTest {
 		}
 	}
 
+	@Test
+	/**
+	 * The Jaas configuration file provided should not be overridden.
+	 */
+	public void testJaasPropertyOverride() throws Exception {
+		String confFile = "jaas.conf";
+		System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+
+		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration()));
+
+		Assert.assertEquals(
+			confFile,
+			System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+	}
+
 
 	private String getOSUserName() throws Exception {
 		String userName = "";