You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/13 13:37:07 UTC
[flink] branch master updated: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3950987de04 [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager
3950987de04 is described below
commit 3950987de0496fcdca778994498768430faea95f
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon May 23 10:03:54 2022 +0200
[FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager
---
.../generated/security_auth_kerberos_section.html | 12 ++
.../generated/security_configuration.html | 12 ++
.../flink/configuration/SecurityOptions.java | 16 ++
.../apache/flink/runtime/util/HadoopUtilsTest.java | 4 +-
.../flink/runtime/hadoop/HadoopUserUtils.java | 32 ++++
.../security/token/DelegationTokenManager.java | 2 +-
.../token/KerberosDelegationTokenManager.java | 179 +++++++++++++++++++--
.../security/token/KerberosLoginProvider.java | 101 ++++++++++++
.../token/KerberosRenewalPossibleProvider.java | 66 --------
.../flink/runtime/hadoop/HadoopUserUtilsTest.java | 72 +++++++++
.../KerberosDelegationTokenManagerITCase.java | 72 ++++++++-
.../security/token/KerberosLoginProviderTest.java | 160 ++++++++++++++++++
.../token/KerberosRenewalPossibleProviderTest.java | 78 ---------
.../apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
14 files changed, 648 insertions(+), 160 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index 638ee4c9ec0..b0cbc3179bd 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -44,5 +44,17 @@
<td>Duration</td>
<td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
</tr>
+ <tr>
+ <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
+ </tr>
+ <tr>
+ <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
+ <td style="word-wrap: break-word;">0.75</td>
+ <td>Double</td>
+ <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html b/docs/layouts/shortcodes/generated/security_configuration.html
index 87a3aab06d5..e0b9e6b0fc5 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -56,6 +56,18 @@
<td>Duration</td>
<td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
</tr>
+ <tr>
+ <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
+ </tr>
+ <tr>
+ <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
+ <td style="word-wrap: break-word;">0.75</td>
+ <td>Double</td>
+ <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
+ </tr>
<tr>
<td><h5>security.module.factory.classes</h5></td>
<td style="word-wrap: break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 9c125959c84..e67d3edc5b2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -132,6 +132,22 @@ public class SecurityOptions {
.withDescription(
"The time period when keytab login happens automatically in order to always have a valid TGT.");
+ @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+ public static final ConfigOption<Duration> KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF =
+ key("security.kerberos.tokens.renewal.retry.backoff")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "The time period how long to wait before retrying to obtain new delegation tokens after a failure.");
+
+ @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+ public static final ConfigOption<Double> KERBEROS_TOKENS_RENEWAL_TIME_RATIO =
+ key("security.kerberos.tokens.renewal.time-ratio")
+ .doubleType()
+ .defaultValue(0.75)
+ .withDescription(
+ "Ratio of the tokens's expiration time when new credentials should be re-obtained.");
+
// ------------------------------------------------------------------------
// ZooKeeper Security Options
// ------------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
index 964508faad4..39c8ed722a7 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -32,10 +32,10 @@ import org.mockito.Mockito;
import sun.security.krb5.KrbException;
import static org.apache.flink.runtime.util.HadoopUtils.HDFS_DELEGATION_TOKEN_KIND;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Unit tests for Hadoop utils. */
public class HadoopUtilsTest extends TestLogger {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
new file mode 100644
index 00000000000..15bb508b0b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.hadoop;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Utility class for working with Hadoop user related classes. This should only be used if Hadoop is
+ * on the classpath.
+ */
+public class HadoopUserUtils {
+
+ public static boolean isProxyUser(UserGroupInformation ugi) {
+ return ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index a4efe308c4e..34869b49cd8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -32,7 +32,7 @@ public interface DelegationTokenManager {
/**
* Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
*/
- void obtainDelegationTokens(Credentials credentials);
+ void obtainDelegationTokens(Credentials credentials) throws Exception;
/**
* Creates a re-occurring task which obtains new tokens and automatically distributes them to
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
index 20fd44d6fc2..5a755ba91ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
@@ -29,16 +29,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -56,7 +63,11 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
private final Configuration configuration;
- private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+ private final double tokensRenewalTimeRatio;
+
+ private final long renewalRetryBackoffPeriod;
+
+ private final KerberosLoginProvider kerberosLoginProvider;
@VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
@@ -66,16 +77,37 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
@Nullable private ScheduledFuture<?> tgtRenewalFuture;
+ private final Object tokensUpdateFutureLock = new Object();
+
+ @GuardedBy("tokensUpdateFutureLock")
+ @Nullable
+ private ScheduledFuture<?> tokensUpdateFuture;
+
public KerberosDelegationTokenManager(
Configuration configuration,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor) {
+ this(
+ configuration,
+ scheduledExecutor,
+ ioExecutor,
+ new KerberosLoginProvider(configuration));
+ }
+
+ public KerberosDelegationTokenManager(
+ Configuration configuration,
+ @Nullable ScheduledExecutor scheduledExecutor,
+ @Nullable ExecutorService ioExecutor,
+ KerberosLoginProvider kerberosLoginProvider) {
this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
+ SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
+ this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
+ this.renewalRetryBackoffPeriod =
+ configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
+ this.kerberosLoginProvider = kerberosLoginProvider;
+ this.delegationTokenProviders = loadProviders();
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
- this.kerberosRenewalPossibleProvider =
- new KerberosRenewalPossibleProvider(new SecurityConfiguration(configuration));
- this.delegationTokenProviders = loadProviders();
}
private Map<String, DelegationTokenProvider> loadProviders() {
@@ -127,8 +159,59 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
* Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
*/
@Override
- public void obtainDelegationTokens(Credentials credentials) {
+ public void obtainDelegationTokens(Credentials credentials) throws Exception {
LOG.info("Obtaining delegation tokens");
+
+ // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+ // skip creation when those are not available.
+ if (kerberosLoginProvider.isLoginPossible()) {
+ UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+ freshUGI.doAs(
+ (PrivilegedExceptionAction<Void>)
+ () -> {
+ obtainDelegationTokensAndGetNextRenewal(credentials);
+ return null;
+ });
+ LOG.info("Delegation tokens obtained successfully");
+ } else {
+ LOG.info("Real user has no kerberos credentials so no tokens obtained");
+ }
+ }
+
+ protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
+ Optional<Long> nextRenewal =
+ delegationTokenProviders.values().stream()
+ .map(
+ provider -> {
+ Optional<Long> nr = Optional.empty();
+ if (provider.delegationTokensRequired()) {
+ LOG.debug(
+ "Obtaining delegation token for service {}",
+ provider.serviceName());
+ nr = provider.obtainDelegationTokens(credentials);
+ LOG.debug(
+ "Obtained delegation token for service {} successfully",
+ provider.serviceName());
+ } else {
+ LOG.debug(
+ "Service {} does not need to obtain delegation token",
+ provider.serviceName());
+ }
+ return nr;
+ })
+ .flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty))
+ .min(Long::compare);
+
+ credentials
+ .getAllTokens()
+ .forEach(
+ token ->
+ LOG.debug(
+ "Token Service:{} Identifier:{}",
+ token.getService(),
+ token.getIdentifier()));
+
+ return nextRenewal;
}
/**
@@ -139,18 +222,24 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
public void start() throws Exception {
checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
checkNotNull(ioExecutor, "IO executor must not be null");
- checkState(tgtRenewalFuture == null, "Manager is already started");
+ synchronized (tokensUpdateFutureLock) {
+ checkState(
+ tgtRenewalFuture == null && tokensUpdateFuture == null,
+ "Manager is already started");
+ }
- if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+ if (!kerberosLoginProvider.isLoginPossible()) {
LOG.info("Renewal is NOT possible, skipping to start renewal task");
return;
}
startTGTRenewal();
+ startTokensUpdate();
}
+ @VisibleForTesting
void startTGTRenewal() throws IOException {
- LOG.debug("Starting TGT renewal task");
+ LOG.info("Starting TGT renewal task");
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (currentUser.isFromKeytab()) {
@@ -173,15 +262,16 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
LOG.warn("Error while renewing TGT", e);
}
}),
- tgtRenewalPeriod,
+ 0,
tgtRenewalPeriod,
TimeUnit.MILLISECONDS);
- LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+ LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
} else {
- LOG.debug("TGT renewal task not started");
+ LOG.info("TGT renewal task not started");
}
}
+ @VisibleForTesting
void stopTGTRenewal() {
if (tgtRenewalFuture != null) {
tgtRenewalFuture.cancel(true);
@@ -189,11 +279,78 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
}
}
+ @VisibleForTesting
+ void startTokensUpdate() {
+ try {
+ LOG.info("Starting tokens update task");
+ Credentials credentials = new Credentials();
+ UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+ Optional<Long> nextRenewal =
+ freshUGI.doAs(
+ (PrivilegedExceptionAction<Optional<Long>>)
+ () -> obtainDelegationTokensAndGetNextRenewal(credentials));
+ if (nextRenewal.isPresent()) {
+ long renewalDelay =
+ calculateRenewalDelay(Clock.systemDefaultZone(), nextRenewal.get());
+ synchronized (tokensUpdateFutureLock) {
+ tokensUpdateFuture =
+ scheduledExecutor.schedule(
+ () -> ioExecutor.execute(this::startTokensUpdate),
+ renewalDelay,
+ TimeUnit.MILLISECONDS);
+ }
+ LOG.info("Tokens update task started with {} ms delay", renewalDelay);
+ } else {
+ LOG.warn(
+ "Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date");
+ }
+ } catch (InterruptedException e) {
+ // Ignore, may happen if shutting down.
+ LOG.debug("Interrupted", e);
+ } catch (Exception e) {
+ synchronized (tokensUpdateFutureLock) {
+ tokensUpdateFuture =
+ scheduledExecutor.schedule(
+ () -> ioExecutor.execute(this::startTokensUpdate),
+ renewalRetryBackoffPeriod,
+ TimeUnit.MILLISECONDS);
+ }
+ LOG.warn(
+ "Failed to update tokens, will try again in {} ms",
+ renewalRetryBackoffPeriod,
+ e);
+ }
+ }
+
+ @VisibleForTesting
+ void stopTokensUpdate() {
+ synchronized (tokensUpdateFutureLock) {
+ if (tokensUpdateFuture != null) {
+ tokensUpdateFuture.cancel(true);
+ tokensUpdateFuture = null;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ long calculateRenewalDelay(Clock clock, long nextRenewal) {
+ long now = clock.millis();
+ long renewalDelay = Math.round(tokensRenewalTimeRatio * (nextRenewal - now));
+ LOG.debug(
+ "Calculated delay on renewal is {}, based on next renewal {} and the ratio {}, and current time {}",
+ renewalDelay,
+ nextRenewal,
+ tokensRenewalTimeRatio,
+ now);
+ return renewalDelay;
+ }
+
/** Stops re-occurring token obtain task. */
@Override
public void stop() {
LOG.info("Stopping credential renewal");
+ stopTokensUpdate();
stopTGTRenewal();
LOG.info("Stopped credential renewal");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
new file mode 100644
index 00000000000..a4e1f786fd4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.hadoop.HadoopUserUtils;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Provides Kerberos login functionality. */
+public class KerberosLoginProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KerberosLoginProvider.class);
+
+ private final String principal;
+
+ private final String keytab;
+
+ private final boolean useTicketCache;
+
+ public KerberosLoginProvider(Configuration configuration) {
+ checkNotNull(configuration, "Flink configuration must not be null");
+ SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
+ this.principal = securityConfiguration.getPrincipal();
+ this.keytab = securityConfiguration.getKeytab();
+ this.useTicketCache = securityConfiguration.useTicketCache();
+ }
+
+ public boolean isLoginPossible() throws IOException {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ if (principal != null) {
+ LOG.debug("Login from keytab is possible");
+ return true;
+ } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
+ if (useTicketCache && currentUser.hasKerberosCredentials()) {
+ LOG.debug("Login from ticket cache is possible");
+ return true;
+ }
+ } else {
+ throwProxyUserNotSupported();
+ }
+
+ LOG.debug("Login is NOT possible");
+
+ return false;
+ }
+
+ public UserGroupInformation doLogin() throws IOException {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ if (principal != null) {
+ LOG.info(
+ "Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
+ UserGroupInformation ugi =
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+ LOG.info("Successfully logged into KDC");
+ return ugi;
+ } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
+ LOG.info("Attempting to load user's ticket cache");
+ final String ccache = System.getenv("KRB5CCNAME");
+ final String user =
+ Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
+ .orElse(currentUser.getUserName());
+ UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
+ LOG.info("Loaded user's ticket cache successfully");
+ return ugi;
+ } else {
+ throwProxyUserNotSupported();
+ return currentUser;
+ }
+ }
+
+ private void throwProxyUserNotSupported() {
+ throw new UnsupportedOperationException("Proxy user is not supported");
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
deleted file mode 100644
index 05a11806be8..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security.token;
-
-import org.apache.flink.runtime.security.SecurityConfiguration;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** It checks whether kerberos credentials can be renewed. */
-public class KerberosRenewalPossibleProvider {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(KerberosRenewalPossibleProvider.class);
-
- private final SecurityConfiguration securityConfiguration;
-
- public KerberosRenewalPossibleProvider(SecurityConfiguration securityConfiguration) {
- this.securityConfiguration =
- checkNotNull(
- securityConfiguration, "Flink security configuration must not be null");
- }
-
- public boolean isRenewalPossible() throws IOException {
- if (!StringUtils.isBlank(securityConfiguration.getKeytab())
- && !StringUtils.isBlank(securityConfiguration.getPrincipal())) {
- LOG.debug("Login from keytab is possible");
- return true;
- }
- LOG.debug("Login from keytab is NOT possible");
-
- if (securityConfiguration.useTicketCache() && hasCurrentUserCredentials()) {
- LOG.debug("Login from ticket cache is possible");
- return true;
- }
- LOG.debug("Login from ticket cache is NOT possible");
-
- return false;
- }
-
- protected boolean hasCurrentUserCredentials() throws IOException {
- return UserGroupInformation.getCurrentUser().hasKerberosCredentials();
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
new file mode 100644
index 00000000000..2fa454a084c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import sun.security.krb5.KrbException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for Hadoop user utils. */
+class HadoopUserUtilsTest {
+
+ @BeforeAll
+ public static void setPropertiesToEnableKerberosConfigInit() throws KrbException {
+ System.setProperty("java.security.krb5.realm", "");
+ System.setProperty("java.security.krb5.kdc", "");
+ System.setProperty("java.security.krb5.conf", "/dev/null");
+ sun.security.krb5.Config.refresh();
+ }
+
+ @Test
+ public void testIsProxyUserShouldReturnFalseWhenNormalUser() {
+ UserGroupInformation.setConfiguration(
+ getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userGroupInformation = createTestUser(AuthenticationMethod.KERBEROS);
+
+ assertFalse(HadoopUserUtils.isProxyUser(userGroupInformation));
+ }
+
+ @Test
+ public void testIsProxyUserShouldReturnTrueWhenProxyUser() {
+ UserGroupInformation.setConfiguration(
+ getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userGroupInformation = createTestUser(AuthenticationMethod.PROXY);
+
+ assertTrue(HadoopUserUtils.isProxyUser(userGroupInformation));
+ }
+
+ private static Configuration getHadoopConfigWithAuthMethod(
+ AuthenticationMethod authenticationMethod) {
+ Configuration conf = new Configuration(true);
+ conf.set("hadoop.security.authentication", authenticationMethod.name());
+ return conf;
+ }
+
+ private static UserGroupInformation createTestUser(AuthenticationMethod authenticationMethod) {
+ UserGroupInformation user = UserGroupInformation.createRemoteUser("test-user");
+ user.setAuthenticationMethod(authenticationMethod);
+ return user;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
index f40d4f95e40..fb35133bb19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
@@ -27,7 +27,13 @@ import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -102,7 +108,7 @@ public class KerberosDelegationTokenManagerITCase {
}
@Test
- public void testStartTGTRenewalShouldScheduleRenewal() throws IOException {
+ public void startTGTRenewalShouldScheduleRenewal() throws IOException {
final ManuallyTriggeredScheduledExecutor scheduledExecutor =
new ManuallyTriggeredScheduledExecutor();
final ManuallyTriggeredScheduledExecutorService scheduler =
@@ -127,4 +133,68 @@ public class KerberosDelegationTokenManagerITCase {
verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab();
}
}
+
+ @Test
+ public void startTokensUpdateShouldScheduleRenewal() throws IOException {
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+ final ManuallyTriggeredScheduledExecutorService scheduler =
+ new ManuallyTriggeredScheduledExecutorService();
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ ExceptionThrowingDelegationTokenProvider.enabled = false;
+ ExceptionThrowingDelegationTokenProvider.constructed = false;
+ Configuration configuration = new Configuration();
+ configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false);
+ AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
+ AtomicBoolean retryExceptionThrown = new AtomicBoolean(false);
+ KerberosLoginProvider kerberosLoginProvider =
+ new KerberosLoginProvider(configuration) {
+ @Override
+ public UserGroupInformation doLogin() {
+ if (startTokensUpdateCallCount.get() == 2) {
+ retryExceptionThrown.set(true);
+ throw new RuntimeException("Intended exception to test retry");
+ }
+ return userGroupInformation;
+ }
+ };
+ KerberosDelegationTokenManager delegationTokenManager =
+ new KerberosDelegationTokenManager(
+ configuration, scheduledExecutor, scheduler, kerberosLoginProvider) {
+ @Override
+ void startTokensUpdate() {
+ startTokensUpdateCallCount.incrementAndGet();
+ super.startTokensUpdate();
+ }
+ };
+
+ delegationTokenManager.startTokensUpdate();
+ scheduledExecutor.triggerScheduledTasks();
+ scheduler.triggerAll();
+ scheduledExecutor.triggerScheduledTasks();
+ scheduler.triggerAll();
+ delegationTokenManager.stopTokensUpdate();
+
+ assertTrue(retryExceptionThrown.get());
+ assertEquals(3, startTokensUpdateCallCount.get());
+ }
+ }
+
+ @Test
+ public void calculateRenewalDelayShouldConsiderRenewalRatio() {
+ ExceptionThrowingDelegationTokenProvider.enabled = false;
+ ExceptionThrowingDelegationTokenProvider.constructed = false;
+ Configuration configuration = new Configuration();
+ configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false);
+ configuration.set(KERBEROS_TOKENS_RENEWAL_TIME_RATIO, 0.5);
+ KerberosDelegationTokenManager delegationTokenManager =
+ new KerberosDelegationTokenManager(configuration, null, null);
+
+ Clock constantClock = Clock.fixed(ofEpochMilli(100), ZoneId.systemDefault());
+ assertEquals(50, delegationTokenManager.calculateRenewalDelay(constantClock, 200));
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderTest.java
new file mode 100644
index 00000000000..debe531c403
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+/** Test for {@link KerberosLoginProvider}. */
+public class KerberosLoginProviderTest {
+
+ @Test
+ public void isLoginPossibleMustReturnFalseByDefault() throws IOException {
+ Configuration configuration = new Configuration();
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ assertFalse(kerberosLoginProvider.isLoginPossible());
+ }
+ }
+
+ @Test
+ public void isLoginPossibleMustReturnTrueWithKeytab(@TempDir Path tmpDir) throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
+ final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+ configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ assertTrue(kerberosLoginProvider.isLoginPossible());
+ }
+ }
+
+ @Test
+ public void isLoginPossibleMustReturnTrueWithTGT() throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ when(userGroupInformation.getAuthenticationMethod())
+ .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
+ when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ assertTrue(kerberosLoginProvider.isLoginPossible());
+ }
+ }
+
+ @Test
+ public void isLoginPossibleMustThrowExceptionWithProxyUser() {
+ Configuration configuration = new Configuration();
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ when(userGroupInformation.getAuthenticationMethod())
+ .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ assertThrows(
+ UnsupportedOperationException.class, kerberosLoginProvider::isLoginPossible);
+ }
+ }
+
+ @Test
+ public void doLoginMustLoginWithKeytab(@TempDir Path tmpDir) throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
+ final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+ configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ kerberosLoginProvider.doLogin();
+ ugi.verify(
+ () ->
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+ anyString(), anyString()));
+ }
+ }
+
+ @Test
+ public void doLoginMustLoginWithTGT() throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ when(userGroupInformation.getAuthenticationMethod())
+ .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
+ when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ kerberosLoginProvider.doLogin();
+ ugi.verify(() -> UserGroupInformation.getUGIFromTicketCache(null, null));
+ }
+ }
+
+ @Test
+ public void doLoginMustThrowExceptionWithProxyUser() {
+ Configuration configuration = new Configuration();
+ KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+ try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+ when(userGroupInformation.getAuthenticationMethod())
+ .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+ ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ assertThrows(UnsupportedOperationException.class, kerberosLoginProvider::doLogin);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
deleted file mode 100644
index a52ad68f79f..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security.token;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.SecurityConfiguration;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
-import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
-import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class KerberosRenewalPossibleProviderTest {
- @Test
- public void isRenewalPossibleMustGiveBackFalseByDefault() throws IOException {
- Configuration configuration = new Configuration();
- configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, false);
- SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
- KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
- new KerberosRenewalPossibleProvider(securityConfiguration);
-
- assertFalse(kerberosRenewalPossibleProvider.isRenewalPossible());
- }
-
- @Test
- public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path tmpDir)
- throws IOException {
- Configuration configuration = new Configuration();
- configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
- final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
- configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
- SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
- KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
- new KerberosRenewalPossibleProvider(securityConfiguration);
-
- assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
- }
-
- @Test
- public void isRenewalPossibleMustGiveBackTrueWhenTGT() throws IOException {
- Configuration configuration = new Configuration();
- configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
- SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
- KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
- new KerberosRenewalPossibleProvider(securityConfiguration) {
- @Override
- protected boolean hasCurrentUserCredentials() {
- return true;
- }
- };
-
- assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
- }
-}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 39c59a3b4f1..0ae2e699fde 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1288,7 +1288,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
});
}
- private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws IOException {
+ private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception {
LOG.info("Adding delegation tokens to the AM container.");
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();