You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/09 12:08:56 UTC
[flink] branch release-1.11 updated: [FLINK-18045] Fix Kerberos
credentials checking
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new c820534 [FLINK-18045] Fix Kerberos credentials checking
c820534 is described below
commit c8205341418d9139dc59d2fcd7b5dbae6b4c5f98
Author: Bartosz Krasinski <ba...@krasinski.biz>
AuthorDate: Wed Jun 3 00:20:25 2020 +0200
[FLINK-18045] Fix Kerberos credentials checking
This closes #12462.
---
.../org/apache/flink/runtime/util/HadoopUtils.java | 42 +++---
.../apache/flink/runtime/util/HadoopUtilsTest.java | 147 +++++++++++++++++++++
.../runtime/security/modules/HadoopModule.java | 8 +-
.../apache/flink/yarn/YarnClusterDescriptor.java | 10 +-
pom.xml | 1 +
5 files changed, 181 insertions(+), 27 deletions(-)
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index e91cd99..e01d59b 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -43,7 +44,7 @@ public class HadoopUtils {
private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
- private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+ static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
@@ -112,29 +113,36 @@ public class HadoopUtils {
return result;
}
- public static boolean isCredentialsConfigured(UserGroupInformation ugi, boolean useTicketCache) throws Exception {
- if (UserGroupInformation.isSecurityEnabled()) {
- if (useTicketCache && !ugi.hasKerberosCredentials()) {
- // a delegation token is an adequate substitute in most cases
- if (!HadoopUtils.hasHDFSDelegationToken()) {
- LOG.error("Hadoop security is enabled, but current login user has neither Kerberos credentials " +
- "nor delegation tokens!");
- return false;
- } else {
- LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials, " +
- "use delegation token instead. Flink application will terminate after token expires.");
- }
+ public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi) {
+ return UserGroupInformation.isSecurityEnabled() && ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS;
+ }
+
+ public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache) {
+ Preconditions.checkState(isKerberosSecurityEnabled(ugi));
+
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ if (useTicketCache && !ugi.hasKerberosCredentials()) {
+ if (hasHDFSDelegationToken(ugi)) {
+ LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials, " +
+ "use delegation token instead. Flink application will terminate after token expires.");
+ return true;
+ } else {
+ LOG.error("Hadoop security is enabled, but current login user has neither Kerberos credentials " +
+ "nor delegation tokens!");
+ return false;
}
}
+
return true;
}
/**
- * Indicates whether the current user has an HDFS delegation token.
+ * Indicates whether the user has an HDFS delegation token.
*/
- public static boolean hasHDFSDelegationToken() throws Exception {
- UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
- Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+ public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) {
+ Collection<Token<? extends TokenIdentifier>> usrTok = ugi.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
return true;
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
new file mode 100644
index 0000000..472a4ff
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import sun.security.krb5.KrbException;
+
+import static org.apache.flink.runtime.util.HadoopUtils.HDFS_DELEGATION_TOKEN_KIND;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Unit tests for Hadoop utils.
+ */
+public class HadoopUtilsTest extends TestLogger {
+
+ @BeforeClass
+ public static void setPropertiesToEnableKerberosConfigInit() throws KrbException {
+ System.setProperty("java.security.krb5.realm", "");
+ System.setProperty("java.security.krb5.kdc", "");
+ System.setProperty("java.security.krb5.conf", "/dev/null");
+ sun.security.krb5.Config.refresh();
+ }
+
+ @Test
+ public void testShouldReturnFalseWhenNoKerberosCredentialsOrDelegationTokens() {
+ UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userWithoutCredentialsOrTokens = createTestUser(AuthenticationMethod.KERBEROS);
+ assumeFalse(userWithoutCredentialsOrTokens.hasKerberosCredentials());
+
+ boolean isKerberosEnabled = HadoopUtils.isKerberosSecurityEnabled(userWithoutCredentialsOrTokens);
+ boolean result = HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsOrTokens, true);
+
+ assertTrue(isKerberosEnabled);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testShouldReturnTrueWhenDelegationTokenIsPresent() {
+ UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userWithoutCredentialsButHavingToken = createTestUser(AuthenticationMethod.KERBEROS);
+ userWithoutCredentialsButHavingToken.addToken(getHDFSDelegationToken());
+ assumeFalse(userWithoutCredentialsButHavingToken.hasKerberosCredentials());
+
+ boolean result = HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsButHavingToken, true);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldReturnTrueWhenKerberosCredentialsArePresent() {
+ UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userWithCredentials = Mockito.mock(UserGroupInformation.class);
+ Mockito.when(userWithCredentials.getAuthenticationMethod()).thenReturn(AuthenticationMethod.KERBEROS);
+ Mockito.when(userWithCredentials.hasKerberosCredentials()).thenReturn(true);
+
+ boolean result = HadoopUtils.areKerberosCredentialsValid(userWithCredentials, true);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void isKerberosSecurityEnabled_NoKerberos_ReturnsFalse() {
+ UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.PROXY));
+ UserGroupInformation userWithAuthMethodOtherThanKerberos = createTestUser(AuthenticationMethod.PROXY);
+
+ boolean result = HadoopUtils.isKerberosSecurityEnabled(userWithAuthMethodOtherThanKerberos);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testShouldReturnTrueIfTicketCacheIsNotUsed() {
+ UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation user = createTestUser(AuthenticationMethod.KERBEROS);
+
+ boolean result = HadoopUtils.areKerberosCredentialsValid(user, false);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldCheckIfTheUserHasHDFSDelegationToken() {
+ UserGroupInformation userWithToken = createTestUser(AuthenticationMethod.KERBEROS);
+ userWithToken.addToken(getHDFSDelegationToken());
+
+ boolean result = HadoopUtils.hasHDFSDelegationToken(userWithToken);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldReturnFalseIfTheUserHasNoHDFSDelegationToken() {
+ UserGroupInformation userWithoutToken = createTestUser(AuthenticationMethod.KERBEROS);
+ assumeTrue(userWithoutToken.getTokens().isEmpty());
+
+ boolean result = HadoopUtils.hasHDFSDelegationToken(userWithoutToken);
+
+ assertFalse(result);
+ }
+
+ private static Configuration getHadoopConfigWithAuthMethod(AuthenticationMethod authenticationMethod) {
+ Configuration conf = new Configuration(true);
+ conf.set("hadoop.security.authentication", authenticationMethod.name());
+ return conf;
+ }
+
+ private static UserGroupInformation createTestUser(AuthenticationMethod authenticationMethod) {
+ UserGroupInformation user = UserGroupInformation.createRemoteUser("test-user");
+ user.setAuthenticationMethod(authenticationMethod);
+ return user;
+ }
+
+ private static Token<DelegationTokenIdentifier> getHDFSDelegationToken() {
+ Token<DelegationTokenIdentifier> token = new Token<>();
+ token.setKind(HDFS_DELEGATION_TOKEN_KIND);
+ return token;
+ }
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index b9250e6..f625b73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -137,11 +137,13 @@ public class HadoopModule implements SecurityModule {
loginUser = UserGroupInformation.getLoginUser();
}
- boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
- loginUser, securityConfig.useTicketCache());
+ LOG.info("Hadoop user set to {}", loginUser);
- LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);
+ if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
+ boolean isCredentialsConfigured = HadoopUtils.areKerberosCredentialsValid(loginUser, securityConfig.useTicketCache());
+ LOG.info("Kerberos security is enabled and credentials are {}.", isCredentialsConfigured ? "valid" : "invalid");
+ }
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 6acd05d..52ac1b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -468,15 +468,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
- if (UserGroupInformation.isSecurityEnabled()) {
- // note: UGI::hasKerberosCredentials inaccurately reports false
- // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
- // so we check only in ticket cache scenario.
+ final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
- boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
- UserGroupInformation.getCurrentUser(), useTicketCache);
- if (!isCredentialsConfigured) {
+ if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials or delegation tokens!");
}
diff --git a/pom.xml b/pom.xml
index 89f791a..5d76464 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1007,6 +1007,7 @@ under the License.
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
+ <arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>