You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/09 12:08:56 UTC

[flink] branch release-1.11 updated: [FLINK-18045] Fix Kerberos credentials checking

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new c820534  [FLINK-18045] Fix Kerberos credentials checking
c820534 is described below

commit c8205341418d9139dc59d2fcd7b5dbae6b4c5f98
Author: Bartosz Krasinski <ba...@krasinski.biz>
AuthorDate: Wed Jun 3 00:20:25 2020 +0200

    [FLINK-18045] Fix Kerberos credentials checking
    
    This closes #12462.
---
 .../org/apache/flink/runtime/util/HadoopUtils.java |  42 +++---
 .../apache/flink/runtime/util/HadoopUtilsTest.java | 147 +++++++++++++++++++++
 .../runtime/security/modules/HadoopModule.java     |   8 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  10 +-
 pom.xml                                            |   1 +
 5 files changed, 181 insertions(+), 27 deletions(-)

diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index e91cd99..e01d59b 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -43,7 +44,7 @@ public class HadoopUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
 
-	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+	static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
 
 	@SuppressWarnings("deprecation")
 	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
@@ -112,29 +113,36 @@ public class HadoopUtils {
 		return result;
 	}
 
-	public static boolean isCredentialsConfigured(UserGroupInformation ugi, boolean useTicketCache) throws Exception {
-		if (UserGroupInformation.isSecurityEnabled()) {
-			if (useTicketCache && !ugi.hasKerberosCredentials()) {
-				// a delegation token is an adequate substitute in most cases
-				if (!HadoopUtils.hasHDFSDelegationToken()) {
-					LOG.error("Hadoop security is enabled, but current login user has neither Kerberos credentials " +
-						"nor delegation tokens!");
-					return false;
-				} else {
-					LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials, " +
-						"use delegation token instead. Flink application will terminate after token expires.");
-				}
+	public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi) {
+		return UserGroupInformation.isSecurityEnabled() && ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS;
+	}
+
+	public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache) {
+		Preconditions.checkState(isKerberosSecurityEnabled(ugi));
+
+		// note: UGI::hasKerberosCredentials inaccurately reports false
+		// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+		// so we check only in ticket cache scenario.
+		if (useTicketCache && !ugi.hasKerberosCredentials()) {
+			if (hasHDFSDelegationToken(ugi)) {
+				LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials, " +
+					"use delegation token instead. Flink application will terminate after token expires.");
+				return true;
+			} else {
+				LOG.error("Hadoop security is enabled, but current login user has neither Kerberos credentials " +
+					"nor delegation tokens!");
+				return false;
 			}
 		}
+
 		return true;
 	}
 
 	/**
-	 * Indicates whether the current user has an HDFS delegation token.
+	 * Indicates whether the user has an HDFS delegation token.
 	 */
-	public static boolean hasHDFSDelegationToken() throws Exception {
-		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+	public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) {
+		Collection<Token<? extends TokenIdentifier>> usrTok = ugi.getTokens();
 		for (Token<? extends TokenIdentifier> token : usrTok) {
 			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
 				return true;
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
new file mode 100644
index 0000000..472a4ff
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import sun.security.krb5.KrbException;
+
+import static org.apache.flink.runtime.util.HadoopUtils.HDFS_DELEGATION_TOKEN_KIND;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Unit tests for Hadoop utils.
+ */
+public class HadoopUtilsTest extends TestLogger {
+
+	@BeforeClass
+	public static void setPropertiesToEnableKerberosConfigInit() throws KrbException {
+		System.setProperty("java.security.krb5.realm", "");
+		System.setProperty("java.security.krb5.kdc", "");
+		System.setProperty("java.security.krb5.conf", "/dev/null");
+		sun.security.krb5.Config.refresh();
+	}
+
+	@Test
+	public void testShouldReturnFalseWhenNoKerberosCredentialsOrDelegationTokens() {
+		UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+		UserGroupInformation userWithoutCredentialsOrTokens = createTestUser(AuthenticationMethod.KERBEROS);
+		assumeFalse(userWithoutCredentialsOrTokens.hasKerberosCredentials());
+
+		boolean isKerberosEnabled = HadoopUtils.isKerberosSecurityEnabled(userWithoutCredentialsOrTokens);
+		boolean result = HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsOrTokens, true);
+
+		assertTrue(isKerberosEnabled);
+		assertFalse(result);
+	}
+
+	@Test
+	public void testShouldReturnTrueWhenDelegationTokenIsPresent() {
+		UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+		UserGroupInformation userWithoutCredentialsButHavingToken = createTestUser(AuthenticationMethod.KERBEROS);
+		userWithoutCredentialsButHavingToken.addToken(getHDFSDelegationToken());
+		assumeFalse(userWithoutCredentialsButHavingToken.hasKerberosCredentials());
+
+		boolean result = HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsButHavingToken, true);
+
+		assertTrue(result);
+	}
+
+	@Test
+	public void testShouldReturnTrueWhenKerberosCredentialsArePresent() {
+		UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+		UserGroupInformation userWithCredentials = Mockito.mock(UserGroupInformation.class);
+		Mockito.when(userWithCredentials.getAuthenticationMethod()).thenReturn(AuthenticationMethod.KERBEROS);
+		Mockito.when(userWithCredentials.hasKerberosCredentials()).thenReturn(true);
+
+		boolean result = HadoopUtils.areKerberosCredentialsValid(userWithCredentials, true);
+
+		assertTrue(result);
+	}
+
+	@Test
+	public void isKerberosSecurityEnabled_NoKerberos_ReturnsFalse() {
+		UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.PROXY));
+		UserGroupInformation userWithAuthMethodOtherThanKerberos = createTestUser(AuthenticationMethod.PROXY);
+
+		boolean result = HadoopUtils.isKerberosSecurityEnabled(userWithAuthMethodOtherThanKerberos);
+
+		assertFalse(result);
+	}
+
+	@Test
+	public void testShouldReturnTrueIfTicketCacheIsNotUsed() {
+		UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+		UserGroupInformation user = createTestUser(AuthenticationMethod.KERBEROS);
+
+		boolean result = HadoopUtils.areKerberosCredentialsValid(user, false);
+
+		assertTrue(result);
+	}
+
+	@Test
+	public void testShouldCheckIfTheUserHasHDFSDelegationToken() {
+		UserGroupInformation userWithToken = createTestUser(AuthenticationMethod.KERBEROS);
+		userWithToken.addToken(getHDFSDelegationToken());
+
+		boolean result = HadoopUtils.hasHDFSDelegationToken(userWithToken);
+
+		assertTrue(result);
+	}
+
+	@Test
+	public void testShouldReturnFalseIfTheUserHasNoHDFSDelegationToken() {
+		UserGroupInformation userWithoutToken = createTestUser(AuthenticationMethod.KERBEROS);
+		assumeTrue(userWithoutToken.getTokens().isEmpty());
+
+		boolean result = HadoopUtils.hasHDFSDelegationToken(userWithoutToken);
+
+		assertFalse(result);
+	}
+
+	private static Configuration getHadoopConfigWithAuthMethod(AuthenticationMethod authenticationMethod) {
+		Configuration conf = new Configuration(true);
+		conf.set("hadoop.security.authentication", authenticationMethod.name());
+		return conf;
+	}
+
+	private static UserGroupInformation createTestUser(AuthenticationMethod authenticationMethod) {
+		UserGroupInformation user = UserGroupInformation.createRemoteUser("test-user");
+		user.setAuthenticationMethod(authenticationMethod);
+		return user;
+	}
+
+	private static Token<DelegationTokenIdentifier> getHDFSDelegationToken() {
+		Token<DelegationTokenIdentifier> token = new Token<>();
+		token.setKind(HDFS_DELEGATION_TOKEN_KIND);
+		return token;
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index b9250e6..f625b73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -137,11 +137,13 @@ public class HadoopModule implements SecurityModule {
 				loginUser = UserGroupInformation.getLoginUser();
 			}
 
-			boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
-				loginUser, securityConfig.useTicketCache());
+			LOG.info("Hadoop user set to {}", loginUser);
 
-			LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);
+			if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
+				boolean isCredentialsConfigured = HadoopUtils.areKerberosCredentialsValid(loginUser, securityConfig.useTicketCache());
 
+				LOG.info("Kerberos security is enabled and credentials are {}.", isCredentialsConfigured ? "valid" : "invalid");
+			}
 		} catch (Throwable ex) {
 			throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
 		}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 6acd05d..52ac1b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -468,15 +468,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 			@Nullable JobGraph jobGraph,
 			boolean detached) throws Exception {
 
-		if (UserGroupInformation.isSecurityEnabled()) {
-			// note: UGI::hasKerberosCredentials inaccurately reports false
-			// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
-			// so we check only in ticket cache scenario.
+		final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+		if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
 			boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
 
-			boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
-				UserGroupInformation.getCurrentUser(), useTicketCache);
-			if (!isCredentialsConfigured) {
+			if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
 				throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
 					"does not have Kerberos credentials or delegation tokens!");
 			}
diff --git a/pom.xml b/pom.xml
index 89f791a..5d76464 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1007,6 +1007,7 @@ under the License.
 								<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
 								<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
 								<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
+								<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
 							</compilerArgs>
 						</configuration>
 					</plugin>