You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/13 13:37:07 UTC

[flink] branch master updated: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3950987de04 [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager
3950987de04 is described below

commit 3950987de0496fcdca778994498768430faea95f
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon May 23 10:03:54 2022 +0200

    [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager
---
 .../generated/security_auth_kerberos_section.html  |  12 ++
 .../generated/security_configuration.html          |  12 ++
 .../flink/configuration/SecurityOptions.java       |  16 ++
 .../apache/flink/runtime/util/HadoopUtilsTest.java |   4 +-
 .../flink/runtime/hadoop/HadoopUserUtils.java      |  32 ++++
 .../security/token/DelegationTokenManager.java     |   2 +-
 .../token/KerberosDelegationTokenManager.java      | 179 +++++++++++++++++++--
 .../security/token/KerberosLoginProvider.java      | 101 ++++++++++++
 .../token/KerberosRenewalPossibleProvider.java     |  66 --------
 .../flink/runtime/hadoop/HadoopUserUtilsTest.java  |  72 +++++++++
 .../KerberosDelegationTokenManagerITCase.java      |  72 ++++++++-
 .../security/token/KerberosLoginProviderTest.java  | 160 ++++++++++++++++++
 .../token/KerberosRenewalPossibleProviderTest.java |  78 ---------
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   2 +-
 14 files changed, 648 insertions(+), 160 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index 638ee4c9ec0..b0cbc3179bd 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -44,5 +44,17 @@
             <td>Duration</td>
             <td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
         </tr>
+        <tr>
+            <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
+        </tr>
+        <tr>
+            <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html b/docs/layouts/shortcodes/generated/security_configuration.html
index 87a3aab06d5..e0b9e6b0fc5 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -56,6 +56,18 @@
             <td>Duration</td>
             <td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
         </tr>
+        <tr>
+            <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
+        </tr>
+        <tr>
+            <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
+        </tr>
         <tr>
             <td><h5>security.module.factory.classes</h5></td>
             <td style="word-wrap: break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 9c125959c84..e67d3edc5b2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF =
+            key("security.kerberos.tokens.renewal.retry.backoff")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "The time period how long to wait before retrying to obtain new delegation tokens after a failure.");
+
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Double> KERBEROS_TOKENS_RENEWAL_TIME_RATIO =
+            key("security.kerberos.tokens.renewal.time-ratio")
+                    .doubleType()
+                    .defaultValue(0.75)
+                    .withDescription(
+                            "Ratio of the tokens's expiration time when new credentials should be re-obtained.");
+
     // ------------------------------------------------------------------------
     //  ZooKeeper Security Options
     // ------------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
index 964508faad4..39c8ed722a7 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -32,10 +32,10 @@ import org.mockito.Mockito;
 import sun.security.krb5.KrbException;
 
 import static org.apache.flink.runtime.util.HadoopUtils.HDFS_DELEGATION_TOKEN_KIND;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
 import static org.junit.Assume.assumeTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unit tests for Hadoop utils. */
 public class HadoopUtilsTest extends TestLogger {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
new file mode 100644
index 00000000000..15bb508b0b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.hadoop;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Utility class for working with Hadoop user related classes. This should only be used if Hadoop is
+ * on the classpath.
+ */
+public class HadoopUserUtils {
+
+    public static boolean isProxyUser(UserGroupInformation ugi) {
+        return ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index a4efe308c4e..34869b49cd8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -32,7 +32,7 @@ public interface DelegationTokenManager {
     /**
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
-    void obtainDelegationTokens(Credentials credentials);
+    void obtainDelegationTokens(Credentials credentials) throws Exception;
 
     /**
      * Creates a re-occurring task which obtains new tokens and automatically distributes them to
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
index 20fd44d6fc2..5a755ba91ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
@@ -29,16 +29,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.Clock;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -56,7 +63,11 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
-    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+    private final double tokensRenewalTimeRatio;
+
+    private final long renewalRetryBackoffPeriod;
+
+    private final KerberosLoginProvider kerberosLoginProvider;
 
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
@@ -66,16 +77,37 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     @Nullable private ScheduledFuture<?> tgtRenewalFuture;
 
+    private final Object tokensUpdateFutureLock = new Object();
+
+    @GuardedBy("tokensUpdateFutureLock")
+    @Nullable
+    private ScheduledFuture<?> tokensUpdateFuture;
+
     public KerberosDelegationTokenManager(
             Configuration configuration,
             @Nullable ScheduledExecutor scheduledExecutor,
             @Nullable ExecutorService ioExecutor) {
+        this(
+                configuration,
+                scheduledExecutor,
+                ioExecutor,
+                new KerberosLoginProvider(configuration));
+    }
+
+    public KerberosDelegationTokenManager(
+            Configuration configuration,
+            @Nullable ScheduledExecutor scheduledExecutor,
+            @Nullable ExecutorService ioExecutor,
+            KerberosLoginProvider kerberosLoginProvider) {
         this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
+        SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
+        this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
+        this.renewalRetryBackoffPeriod =
+                configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
+        this.kerberosLoginProvider = kerberosLoginProvider;
+        this.delegationTokenProviders = loadProviders();
         this.scheduledExecutor = scheduledExecutor;
         this.ioExecutor = ioExecutor;
-        this.kerberosRenewalPossibleProvider =
-                new KerberosRenewalPossibleProvider(new SecurityConfiguration(configuration));
-        this.delegationTokenProviders = loadProviders();
     }
 
     private Map<String, DelegationTokenProvider> loadProviders() {
@@ -127,8 +159,59 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
+        Optional<Long> nextRenewal =
+                delegationTokenProviders.values().stream()
+                        .map(
+                                provider -> {
+                                    Optional<Long> nr = Optional.empty();
+                                    if (provider.delegationTokensRequired()) {
+                                        LOG.debug(
+                                                "Obtaining delegation token for service {}",
+                                                provider.serviceName());
+                                        nr = provider.obtainDelegationTokens(credentials);
+                                        LOG.debug(
+                                                "Obtained delegation token for service {} successfully",
+                                                provider.serviceName());
+                                    } else {
+                                        LOG.debug(
+                                                "Service {} does not need to obtain delegation token",
+                                                provider.serviceName());
+                                    }
+                                    return nr;
+                                })
+                        .flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty))
+                        .min(Long::compare);
+
+        credentials
+                .getAllTokens()
+                .forEach(
+                        token ->
+                                LOG.debug(
+                                        "Token Service:{} Identifier:{}",
+                                        token.getService(),
+                                        token.getIdentifier()));
+
+        return nextRenewal;
     }
 
     /**
@@ -139,18 +222,24 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
     public void start() throws Exception {
         checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
         checkNotNull(ioExecutor, "IO executor must not be null");
-        checkState(tgtRenewalFuture == null, "Manager is already started");
+        synchronized (tokensUpdateFutureLock) {
+            checkState(
+                    tgtRenewalFuture == null && tokensUpdateFuture == null,
+                    "Manager is already started");
+        }
 
-        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+        if (!kerberosLoginProvider.isLoginPossible()) {
             LOG.info("Renewal is NOT possible, skipping to start renewal task");
             return;
         }
 
         startTGTRenewal();
+        startTokensUpdate();
     }
 
+    @VisibleForTesting
     void startTGTRenewal() throws IOException {
-        LOG.debug("Starting TGT renewal task");
+        LOG.info("Starting TGT renewal task");
 
         UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
         if (currentUser.isFromKeytab()) {
@@ -173,15 +262,16 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
                                                     LOG.warn("Error while renewing TGT", e);
                                                 }
                                             }),
-                            tgtRenewalPeriod,
+                            0,
                             tgtRenewalPeriod,
                             TimeUnit.MILLISECONDS);
-            LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+            LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
         } else {
-            LOG.debug("TGT renewal task not started");
+            LOG.info("TGT renewal task not started");
         }
     }
 
+    @VisibleForTesting
     void stopTGTRenewal() {
         if (tgtRenewalFuture != null) {
             tgtRenewalFuture.cancel(true);
@@ -189,11 +279,78 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
         }
     }
 
+    @VisibleForTesting
+    void startTokensUpdate() {
+        try {
+            LOG.info("Starting tokens update task");
+            Credentials credentials = new Credentials();
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            Optional<Long> nextRenewal =
+                    freshUGI.doAs(
+                            (PrivilegedExceptionAction<Optional<Long>>)
+                                    () -> obtainDelegationTokensAndGetNextRenewal(credentials));
+            if (nextRenewal.isPresent()) {
+                long renewalDelay =
+                        calculateRenewalDelay(Clock.systemDefaultZone(), nextRenewal.get());
+                synchronized (tokensUpdateFutureLock) {
+                    tokensUpdateFuture =
+                            scheduledExecutor.schedule(
+                                    () -> ioExecutor.execute(this::startTokensUpdate),
+                                    renewalDelay,
+                                    TimeUnit.MILLISECONDS);
+                }
+                LOG.info("Tokens update task started with {} ms delay", renewalDelay);
+            } else {
+                LOG.warn(
+                        "Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date");
+            }
+        } catch (InterruptedException e) {
+            // Ignore, may happen if shutting down.
+            LOG.debug("Interrupted", e);
+        } catch (Exception e) {
+            synchronized (tokensUpdateFutureLock) {
+                tokensUpdateFuture =
+                        scheduledExecutor.schedule(
+                                () -> ioExecutor.execute(this::startTokensUpdate),
+                                renewalRetryBackoffPeriod,
+                                TimeUnit.MILLISECONDS);
+            }
+            LOG.warn(
+                    "Failed to update tokens, will try again in {} ms",
+                    renewalRetryBackoffPeriod,
+                    e);
+        }
+    }
+
+    @VisibleForTesting
+    void stopTokensUpdate() {
+        synchronized (tokensUpdateFutureLock) {
+            if (tokensUpdateFuture != null) {
+                tokensUpdateFuture.cancel(true);
+                tokensUpdateFuture = null;
+            }
+        }
+    }
+
+    @VisibleForTesting
+    long calculateRenewalDelay(Clock clock, long nextRenewal) {
+        long now = clock.millis();
+        long renewalDelay = Math.round(tokensRenewalTimeRatio * (nextRenewal - now));
+        LOG.debug(
+                "Calculated delay on renewal is {}, based on next renewal {} and the ratio {}, and current time {}",
+                renewalDelay,
+                nextRenewal,
+                tokensRenewalTimeRatio,
+                now);
+        return renewalDelay;
+    }
+
     /** Stops re-occurring token obtain task. */
     @Override
     public void stop() {
         LOG.info("Stopping credential renewal");
 
+        stopTokensUpdate();
         stopTGTRenewal();
 
         LOG.info("Stopped credential renewal");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
new file mode 100644
index 00000000000..a4e1f786fd4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.hadoop.HadoopUserUtils;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Provides Kerberos login functionality. */
+public class KerberosLoginProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KerberosLoginProvider.class);
+
+    private final String principal;
+
+    private final String keytab;
+
+    private final boolean useTicketCache;
+
+    public KerberosLoginProvider(Configuration configuration) {
+        checkNotNull(configuration, "Flink configuration must not be null");
+        SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
+        this.principal = securityConfiguration.getPrincipal();
+        this.keytab = securityConfiguration.getKeytab();
+        this.useTicketCache = securityConfiguration.useTicketCache();
+    }
+
+    public boolean isLoginPossible() throws IOException {
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+        if (principal != null) {
+            LOG.debug("Login from keytab is possible");
+            return true;
+        } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
+            if (useTicketCache && currentUser.hasKerberosCredentials()) {
+                LOG.debug("Login from ticket cache is possible");
+                return true;
+            }
+        } else {
+            throwProxyUserNotSupported();
+        }
+
+        LOG.debug("Login is NOT possible");
+
+        return false;
+    }
+
+    public UserGroupInformation doLogin() throws IOException {
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+        if (principal != null) {
+            LOG.info(
+                    "Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
+            UserGroupInformation ugi =
+                    UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+            LOG.info("Successfully logged into KDC");
+            return ugi;
+        } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
+            LOG.info("Attempting to load user's ticket cache");
+            final String ccache = System.getenv("KRB5CCNAME");
+            final String user =
+                    Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
+                            .orElse(currentUser.getUserName());
+            UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
+            LOG.info("Loaded user's ticket cache successfully");
+            return ugi;
+        } else {
+            throwProxyUserNotSupported();
+            return currentUser;
+        }
+    }
+
+    private void throwProxyUserNotSupported() {
+        throw new UnsupportedOperationException("Proxy user is not supported");
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
deleted file mode 100644
index 05a11806be8..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security.token;
-
-import org.apache.flink.runtime.security.SecurityConfiguration;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** It checks whether kerberos credentials can be renewed. */
-public class KerberosRenewalPossibleProvider {
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(KerberosRenewalPossibleProvider.class);
-
-    private final SecurityConfiguration securityConfiguration;
-
-    public KerberosRenewalPossibleProvider(SecurityConfiguration securityConfiguration) {
-        this.securityConfiguration =
-                checkNotNull(
-                        securityConfiguration, "Flink security configuration must not be null");
-    }
-
-    public boolean isRenewalPossible() throws IOException {
-        if (!StringUtils.isBlank(securityConfiguration.getKeytab())
-                && !StringUtils.isBlank(securityConfiguration.getPrincipal())) {
-            LOG.debug("Login from keytab is possible");
-            return true;
-        }
-        LOG.debug("Login from keytab is NOT possible");
-
-        if (securityConfiguration.useTicketCache() && hasCurrentUserCredentials()) {
-            LOG.debug("Login from ticket cache is possible");
-            return true;
-        }
-        LOG.debug("Login from ticket cache is NOT possible");
-
-        return false;
-    }
-
-    protected boolean hasCurrentUserCredentials() throws IOException {
-        return UserGroupInformation.getCurrentUser().hasKerberosCredentials();
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
new file mode 100644
index 00000000000..2fa454a084c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import sun.security.krb5.KrbException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for Hadoop user utils. */
+class HadoopUserUtilsTest {
+
+    @BeforeAll
+    public static void setPropertiesToEnableKerberosConfigInit() throws KrbException {
+        System.setProperty("java.security.krb5.realm", "");
+        System.setProperty("java.security.krb5.kdc", "");
+        System.setProperty("java.security.krb5.conf", "/dev/null");
+        sun.security.krb5.Config.refresh();
+    }
+
+    @Test
+    public void testIsProxyUserShouldReturnFalseWhenNormalUser() {
+        UserGroupInformation.setConfiguration(
+                getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+        UserGroupInformation userGroupInformation = createTestUser(AuthenticationMethod.KERBEROS);
+
+        assertFalse(HadoopUserUtils.isProxyUser(userGroupInformation));
+    }
+
+    @Test
+    public void testIsProxyUserShouldReturnTrueWhenProxyUser() {
+        UserGroupInformation.setConfiguration(
+                getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+        UserGroupInformation userGroupInformation = createTestUser(AuthenticationMethod.PROXY);
+
+        assertTrue(HadoopUserUtils.isProxyUser(userGroupInformation));
+    }
+
+    private static Configuration getHadoopConfigWithAuthMethod(
+            AuthenticationMethod authenticationMethod) {
+        Configuration conf = new Configuration(true);
+        conf.set("hadoop.security.authentication", authenticationMethod.name());
+        return conf;
+    }
+
+    private static UserGroupInformation createTestUser(AuthenticationMethod authenticationMethod) {
+        UserGroupInformation user = UserGroupInformation.createRemoteUser("test-user");
+        user.setAuthenticationMethod(authenticationMethod);
+        return user;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
index f40d4f95e40..fb35133bb19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
@@ -27,7 +27,13 @@ import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
 import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -102,7 +108,7 @@ public class KerberosDelegationTokenManagerITCase {
     }
 
     @Test
-    public void testStartTGTRenewalShouldScheduleRenewal() throws IOException {
+    public void startTGTRenewalShouldScheduleRenewal() throws IOException {
         final ManuallyTriggeredScheduledExecutor scheduledExecutor =
                 new ManuallyTriggeredScheduledExecutor();
         final ManuallyTriggeredScheduledExecutorService scheduler =
@@ -127,4 +133,68 @@ public class KerberosDelegationTokenManagerITCase {
             verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab();
         }
     }
+
+    @Test
+    public void startTokensUpdateShouldScheduleRenewal() throws IOException {
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final ManuallyTriggeredScheduledExecutorService scheduler =
+                new ManuallyTriggeredScheduledExecutorService();
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            ExceptionThrowingDelegationTokenProvider.enabled = false;
+            ExceptionThrowingDelegationTokenProvider.constructed = false;
+            Configuration configuration = new Configuration();
+            configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false);
+            AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
+            AtomicBoolean retryExceptionThrown = new AtomicBoolean(false);
+            KerberosLoginProvider kerberosLoginProvider =
+                    new KerberosLoginProvider(configuration) {
+                        @Override
+                        public UserGroupInformation doLogin() {
+                            if (startTokensUpdateCallCount.get() == 2) {
+                                retryExceptionThrown.set(true);
+                                throw new RuntimeException("Intended exception to test retry");
+                            }
+                            return userGroupInformation;
+                        }
+                    };
+            KerberosDelegationTokenManager delegationTokenManager =
+                    new KerberosDelegationTokenManager(
+                            configuration, scheduledExecutor, scheduler, kerberosLoginProvider) {
+                        @Override
+                        void startTokensUpdate() {
+                            startTokensUpdateCallCount.incrementAndGet();
+                            super.startTokensUpdate();
+                        }
+                    };
+
+            delegationTokenManager.startTokensUpdate();
+            scheduledExecutor.triggerScheduledTasks();
+            scheduler.triggerAll();
+            scheduledExecutor.triggerScheduledTasks();
+            scheduler.triggerAll();
+            delegationTokenManager.stopTokensUpdate();
+
+            assertTrue(retryExceptionThrown.get());
+            assertEquals(3, startTokensUpdateCallCount.get());
+        }
+    }
+
+    @Test
+    public void calculateRenewalDelayShouldConsiderRenewalRatio() {
+        ExceptionThrowingDelegationTokenProvider.enabled = false;
+        ExceptionThrowingDelegationTokenProvider.constructed = false;
+        Configuration configuration = new Configuration();
+        configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false);
+        configuration.set(KERBEROS_TOKENS_RENEWAL_TIME_RATIO, 0.5);
+        KerberosDelegationTokenManager delegationTokenManager =
+                new KerberosDelegationTokenManager(configuration, null, null);
+
+        Clock constantClock = Clock.fixed(ofEpochMilli(100), ZoneId.systemDefault());
+        assertEquals(50, delegationTokenManager.calculateRenewalDelay(constantClock, 200));
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderTest.java
new file mode 100644
index 00000000000..debe531c403
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+/** Test for {@link KerberosLoginProvider}. */
+public class KerberosLoginProviderTest {
+
+    @Test
+    public void isLoginPossibleMustReturnFalseByDefault() throws IOException {
+        Configuration configuration = new Configuration();
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertFalse(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustReturnTrueWithKeytab(@TempDir Path tmpDir) throws IOException {
+        Configuration configuration = new Configuration();
+        configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
+        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+        configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertTrue(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustReturnTrueWithTGT() throws IOException {
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
+            when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertTrue(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustThrowExceptionWithProxyUser() {
+        Configuration configuration = new Configuration();
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertThrows(
+                    UnsupportedOperationException.class, kerberosLoginProvider::isLoginPossible);
+        }
+    }
+
+    @Test
+    public void doLoginMustLoginWithKeytab(@TempDir Path tmpDir) throws IOException {
+        Configuration configuration = new Configuration();
+        configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
+        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+        configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            kerberosLoginProvider.doLogin();
+            ugi.verify(
+                    () ->
+                            UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+                                    anyString(), anyString()));
+        }
+    }
+
+    @Test
+    public void doLoginMustLoginWithTGT() throws IOException {
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
+            when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            kerberosLoginProvider.doLogin();
+            ugi.verify(() -> UserGroupInformation.getUGIFromTicketCache(null, null));
+        }
+    }
+
+    @Test
+    public void doLoginMustThrowExceptionWithProxyUser() {
+        Configuration configuration = new Configuration();
+        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertThrows(UnsupportedOperationException.class, kerberosLoginProvider::doLogin);
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
deleted file mode 100644
index a52ad68f79f..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security.token;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.SecurityConfiguration;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
-import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
-import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class KerberosRenewalPossibleProviderTest {
-    @Test
-    public void isRenewalPossibleMustGiveBackFalseByDefault() throws IOException {
-        Configuration configuration = new Configuration();
-        configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, false);
-        SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
-        KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
-                new KerberosRenewalPossibleProvider(securityConfiguration);
-
-        assertFalse(kerberosRenewalPossibleProvider.isRenewalPossible());
-    }
-
-    @Test
-    public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path tmpDir)
-            throws IOException {
-        Configuration configuration = new Configuration();
-        configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
-        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
-        configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
-        SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
-        KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
-                new KerberosRenewalPossibleProvider(securityConfiguration);
-
-        assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
-    }
-
-    @Test
-    public void isRenewalPossibleMustGiveBackTrueWhenTGT() throws IOException {
-        Configuration configuration = new Configuration();
-        configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
-        SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
-        KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
-                new KerberosRenewalPossibleProvider(securityConfiguration) {
-                    @Override
-                    protected boolean hasCurrentUserCredentials() {
-                        return true;
-                    }
-                };
-
-        assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
-    }
-}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 39c59a3b4f1..0ae2e699fde 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1288,7 +1288,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
                         });
     }
 
-    private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws IOException {
+    private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception {
         LOG.info("Adding delegation tokens to the AM container.");
 
         Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();