You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/07/05 11:35:34 UTC

[flink] branch master updated: [FLINK-25908][runtime][security] Add HadoopFSDelegationTokenProvider

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a89152713aa [FLINK-25908][runtime][security] Add HadoopFSDelegationTokenProvider
a89152713aa is described below

commit a89152713aa58647841c46ed2335b45d24c553f9
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Tue Jul 5 13:35:27 2022 +0200

    [FLINK-25908][runtime][security] Add HadoopFSDelegationTokenProvider
---
 .../security/token/DelegationTokenProvider.java    |   6 +-
 .../token/HadoopFSDelegationTokenProvider.java     | 277 +++++++++++++++++++++
 .../token/KerberosDelegationTokenManager.java      |  45 ++--
 ....runtime.security.token.DelegationTokenProvider |  16 ++
 .../HadoopFSDelegationTokenProviderITCase.java     | 194 +++++++++++++++
 .../KerberosDelegationTokenManagerITCase.java      |   3 +-
 .../token/TestDelegationTokenIdentifier.java       |  64 +++++
 ...rg.apache.hadoop.security.token.TokenIdentifier |  16 ++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   2 +-
 9 files changed, 601 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
index ae434fbd4b1..39764133d35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
@@ -39,14 +39,14 @@ public interface DelegationTokenProvider {
      *
      * @param configuration Configuration to initialize the provider.
      */
-    void init(Configuration configuration);
+    void init(Configuration configuration) throws Exception;
 
     /**
      * Return whether delegation tokens are required for this service.
      *
      * @return true if delegation tokens are required.
      */
-    boolean delegationTokensRequired();
+    boolean delegationTokensRequired() throws Exception;
 
     /**
      * Obtain delegation tokens for this service.
@@ -55,5 +55,5 @@ public interface DelegationTokenProvider {
      * @return If the returned tokens are renewable and can be renewed, return the time of the next
      *     renewal, otherwise `Optional.empty()` should be returned.
      */
-    Optional<Long> obtainDelegationTokens(Credentials credentials);
+    Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
new file mode 100644
index 00000000000..a3033a15a87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** Delegation token provider for Hadoop filesystems. */
+@Experimental
+public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
+
+    private Configuration flinkConfiguration;
+
+    private org.apache.hadoop.conf.Configuration hadoopConfiguration;
+
+    private Optional<Long> tokenRenewalInterval;
+
+    @Override
+    public String serviceName() {
+        return "hadoopfs";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        flinkConfiguration = configuration;
+        hadoopConfiguration = HadoopUtils.getHadoopConfiguration(configuration);
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser());
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Clock clock = Clock.systemDefaultZone();
+        Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();
+
+        obtainDelegationTokens(null, fileSystemsToAccess, credentials);
+
+        // Get the token renewal interval if it is not set. It will be called only once.
+        if (tokenRenewalInterval == null) {
+            tokenRenewalInterval = getTokenRenewalInterval(clock, fileSystemsToAccess);
+        }
+        return tokenRenewalInterval.flatMap(
+                interval -> getTokenRenewalDate(clock, credentials, interval));
+    }
+
+    private Set<FileSystem> getFileSystemsToAccess() throws IOException {
+        Set<FileSystem> result = new HashSet<>();
+
+        // Default filesystem
+        FileSystem defaultFileSystem = FileSystem.get(hadoopConfiguration);
+        LOG.debug(
+                "Adding Hadoop default filesystem to file systems to access {}", defaultFileSystem);
+        result.add(defaultFileSystem);
+        LOG.debug("Hadoop default filesystem added to file systems to access successfully");
+
+        // Additional filesystems
+        ConfigUtils.decodeListFromConfig(
+                        flinkConfiguration,
+                        SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS,
+                        Path::new)
+                .forEach(
+                        path -> {
+                            try {
+                                LOG.debug(
+                                        "Adding path's filesystem to file systems to access {}",
+                                        path);
+                                result.add(path.getFileSystem(hadoopConfiguration));
+                                LOG.debug(
+                                        "Path's filesystem added to file systems to access successfully");
+                            } catch (IOException e) {
+                                LOG.error("Failed to get filesystem for {}", path, e);
+                                throw new FlinkRuntimeException(e);
+                            }
+                        });
+
+        // YARN staging dir
+        if (flinkConfiguration.getString(DeploymentOptions.TARGET).toLowerCase().contains("yarn")) {
+            LOG.debug("Running on YARN, trying to add staging directory to file systems to access");
+            String yarnStagingDirectory =
+                    flinkConfiguration.getString("yarn.staging-directory", "");
+            if (!StringUtils.isBlank(yarnStagingDirectory)) {
+                LOG.debug(
+                        "Adding staging directory to file systems to access {}",
+                        yarnStagingDirectory);
+                result.add(new Path(yarnStagingDirectory).getFileSystem(hadoopConfiguration));
+                LOG.debug("Staging directory added to file systems to access successfully");
+            } else {
+                LOG.debug(
+                        "Staging directory is not set or empty so not added to file systems to access");
+            }
+        }
+
+        return result;
+    }
+
+    protected void obtainDelegationTokens(
+            @Nullable String renewer,
+            Set<FileSystem> fileSystemsToAccess,
+            Credentials credentials) {
+        fileSystemsToAccess.forEach(
+                fs -> {
+                    try {
+                        LOG.debug("Obtaining delegation token for {} with renewer {}", fs, renewer);
+                        fs.addDelegationTokens(renewer, credentials);
+                        LOG.debug("Delegation obtained successfully");
+                    } catch (Exception e) {
+                        LOG.error("Failed to obtain delegation token for {}", fs, e);
+                        throw new FlinkRuntimeException(e);
+                    }
+                });
+    }
+
+    Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> fileSystemsToAccess)
+            throws IOException {
+        // We cannot use the tokens generated with renewer yarn
+        // Trying to renew those will fail with an access control issue
+        // So create new tokens with the logged in user as renewer
+        String renewer = UserGroupInformation.getCurrentUser().getUserName();
+
+        Credentials credentials = new Credentials();
+        obtainDelegationTokens(renewer, fileSystemsToAccess, credentials);
+
+        Optional<Long> result =
+                credentials.getAllTokens().stream()
+                        .filter(
+                                t -> {
+                                    try {
+                                        return t.decodeIdentifier()
+                                                instanceof AbstractDelegationTokenIdentifier;
+                                    } catch (IOException e) {
+                                        throw new FlinkRuntimeException(e);
+                                    }
+                                })
+                        .map(
+                                t -> {
+                                    try {
+                                        long newExpiration = t.renew(hadoopConfiguration);
+                                        AbstractDelegationTokenIdentifier identifier =
+                                                (AbstractDelegationTokenIdentifier)
+                                                        t.decodeIdentifier();
+                                        String tokenKind = t.getKind().toString();
+                                        long interval =
+                                                newExpiration
+                                                        - getIssueDate(
+                                                                clock, tokenKind, identifier);
+                                        LOG.debug(
+                                                "Renewal interval is {} for token {}",
+                                                interval,
+                                                tokenKind);
+                                        return interval;
+                                    } catch (Exception e) {
+                                        throw new FlinkRuntimeException(e);
+                                    }
+                                })
+                        .min(Long::compare);
+
+        LOG.debug("Global renewal interval is {}", result);
+
+        return result;
+    }
+
+    @VisibleForTesting
+    Optional<Long> getTokenRenewalDate(Clock clock, Credentials credentials, long renewalInterval) {
+        if (renewalInterval < 0) {
+            LOG.debug("Negative renewal interval so no renewal date is calculated");
+            return Optional.empty();
+        }
+
+        Optional<Long> result =
+                credentials.getAllTokens().stream()
+                        .filter(
+                                t -> {
+                                    try {
+                                        return t.decodeIdentifier()
+                                                instanceof AbstractDelegationTokenIdentifier;
+                                    } catch (IOException e) {
+                                        throw new FlinkRuntimeException(e);
+                                    }
+                                })
+                        .map(
+                                t -> {
+                                    try {
+                                        AbstractDelegationTokenIdentifier identifier =
+                                                (AbstractDelegationTokenIdentifier)
+                                                        t.decodeIdentifier();
+                                        String tokenKind = t.getKind().toString();
+                                        long date =
+                                                getIssueDate(clock, tokenKind, identifier)
+                                                        + renewalInterval;
+                                        LOG.debug(
+                                                "Renewal date is {} for token {}", date, tokenKind);
+                                        return date;
+                                    } catch (Exception e) {
+                                        throw new FlinkRuntimeException(e);
+                                    }
+                                })
+                        .min(Long::compare);
+
+        LOG.debug("Global renewal date is {}", result);
+
+        return result;
+    }
+
+    @VisibleForTesting
+    long getIssueDate(Clock clock, String tokenKind, AbstractDelegationTokenIdentifier identifier) {
+        long now = clock.millis();
+        long issueDate = identifier.getIssueDate();
+
+        if (issueDate > now) {
+            LOG.warn(
+                    "Token {} has set up issue date later than current time. (provided: "
+                            + "{} / current timestamp: {}) Please make sure clocks are in sync between "
+                            + "machines. If the issue is not a clock mismatch, consult token implementor to check "
+                            + "whether issue date is valid.",
+                    tokenKind,
+                    issueDate,
+                    now);
+            return issueDate;
+        } else if (issueDate > 0) {
+            return issueDate;
+        } else {
+            LOG.warn(
+                    "Token {} has not set up issue date properly. (provided: {}) "
+                            + "Using current timestamp ({}) as issue date instead. Consult token implementor to fix "
+                            + "the behavior.",
+                    tokenKind,
+                    issueDate,
+                    now);
+            return now;
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
index 5a755ba91ac..1682e7d9827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.security.token;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.apache.hadoop.security.Credentials;
@@ -130,12 +131,14 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
                             "Delegation token provider {} is disabled so not loaded",
                             provider.serviceName());
                 }
-            } catch (Exception e) {
+            } catch (Exception | NoClassDefFoundError e) {
                 LOG.error(
-                        "Failed to initialize delegation token provider {}.",
+                        "Failed to initialize delegation token provider {}",
                         provider.serviceName(),
                         e);
-                throw e;
+                if (!(e instanceof NoClassDefFoundError)) {
+                    throw new FlinkRuntimeException(e);
+                }
             }
         }
 
@@ -183,21 +186,29 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
                 delegationTokenProviders.values().stream()
                         .map(
                                 provider -> {
-                                    Optional<Long> nr = Optional.empty();
-                                    if (provider.delegationTokensRequired()) {
-                                        LOG.debug(
-                                                "Obtaining delegation token for service {}",
-                                                provider.serviceName());
-                                        nr = provider.obtainDelegationTokens(credentials);
-                                        LOG.debug(
-                                                "Obtained delegation token for service {} successfully",
-                                                provider.serviceName());
-                                    } else {
-                                        LOG.debug(
-                                                "Service {} does not need to obtain delegation token",
-                                                provider.serviceName());
+                                    try {
+                                        Optional<Long> nr = Optional.empty();
+                                        if (provider.delegationTokensRequired()) {
+                                            LOG.debug(
+                                                    "Obtaining delegation token for service {}",
+                                                    provider.serviceName());
+                                            nr = provider.obtainDelegationTokens(credentials);
+                                            LOG.debug(
+                                                    "Obtained delegation token for service {} successfully",
+                                                    provider.serviceName());
+                                        } else {
+                                            LOG.debug(
+                                                    "Service {} does not need to obtain delegation token",
+                                                    provider.serviceName());
+                                        }
+                                        return nr;
+                                    } catch (Exception e) {
+                                        LOG.error(
+                                                "Failed to obtain delegation token for provider {}",
+                                                provider.serviceName(),
+                                                e);
+                                        throw new FlinkRuntimeException(e);
                                     }
-                                    return nr;
                                 })
                         .flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty))
                         .min(Long::compare);
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
new file mode 100644
index 00000000000..56619ee46d5
--- /dev/null
+++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
new file mode 100644
index 00000000000..0708069510b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for {@link HadoopFSDelegationTokenProvider}. */
+class HadoopFSDelegationTokenProviderITCase {
+
+    private static final long NOW = 100;
+
+    private static final String masterPrincipal = "MasterPrincipal";
+
+    final Text tokenService1 = new Text("TEST_TOKEN_SERVICE1");
+    final Text tokenService2 = new Text("TEST_TOKEN_SERVICE2");
+
+    private class TestDelegationToken extends Token<TestDelegationTokenIdentifier> {
+
+        private long newExpiration;
+
+        public TestDelegationToken(
+                Text tokenService, TestDelegationTokenIdentifier identifier, long newExpiration) {
+            super(identifier.getBytes(), new byte[4], identifier.getKind(), tokenService);
+            this.newExpiration = newExpiration;
+        }
+
+        public TestDelegationToken(Text tokenService, TestDelegationTokenIdentifier identifier) {
+            this(tokenService, identifier, 0L);
+        }
+
+        @Override
+        public long renew(Configuration conf) {
+            return newExpiration;
+        }
+    }
+
+    @Test
+    public void getTokenRenewalIntervalShouldReturnNoneWhenNoTokens() throws IOException {
+        HadoopFSDelegationTokenProvider provider =
+                new HadoopFSDelegationTokenProvider() {
+                    @Override
+                    protected void obtainDelegationTokens(
+                            String renewer,
+                            Set<FileSystem> fileSystemsToAccess,
+                            Credentials credentials) {}
+                };
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+
+        assertEquals(
+                Optional.empty(),
+                provider.getTokenRenewalInterval(constantClock, Collections.emptySet()));
+    }
+
+    @Test
+    public void getTokenRenewalIntervalShouldReturnMinWhenMultipleTokens() throws IOException {
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        HadoopFSDelegationTokenProvider provider =
+                new HadoopFSDelegationTokenProvider() {
+                    @Override
+                    protected void obtainDelegationTokens(
+                            String renewer,
+                            Set<FileSystem> fileSystemsToAccess,
+                            Credentials credentials) {
+                        TestDelegationTokenIdentifier tokenIdentifier1 =
+                                new TestDelegationTokenIdentifier(NOW);
+                        credentials.addToken(
+                                tokenService1,
+                                new TestDelegationToken(tokenService1, tokenIdentifier1, NOW + 1));
+
+                        TestDelegationTokenIdentifier tokenIdentifier2 =
+                                new TestDelegationTokenIdentifier(NOW);
+                        credentials.addToken(
+                                tokenService2,
+                                new TestDelegationToken(tokenService2, tokenIdentifier2, NOW + 2));
+                    }
+                };
+
+        assertEquals(
+                Optional.of(1L),
+                provider.getTokenRenewalInterval(constantClock, Collections.emptySet()));
+    }
+
+    @Test
+    public void getTokenRenewalDateShouldReturnNoneWhenNegativeRenewalInterval() {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+        Credentials credentials = new Credentials();
+
+        assertEquals(
+                Optional.empty(), provider.getTokenRenewalDate(constantClock, credentials, -1));
+    }
+
+    @Test
+    public void getTokenRenewalDateShouldReturnNoneWhenNoTokens() {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+        Credentials credentials = new Credentials();
+
+        assertEquals(Optional.empty(), provider.getTokenRenewalDate(constantClock, credentials, 1));
+    }
+
+    @Test
+    public void getTokenRenewalDateShouldReturnMinWhenMultipleTokens() {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        Credentials credentials = new Credentials();
+        TestDelegationTokenIdentifier tokenIdentifier1 = new TestDelegationTokenIdentifier(NOW);
+        credentials.addToken(
+                tokenService1, new TestDelegationToken(tokenService1, tokenIdentifier1));
+        TestDelegationTokenIdentifier tokenIdentifier2 = new TestDelegationTokenIdentifier(NOW + 1);
+        credentials.addToken(
+                tokenService2, new TestDelegationToken(tokenService2, tokenIdentifier2));
+
+        assertEquals(
+                Optional.of(NOW + 1), provider.getTokenRenewalDate(constantClock, credentials, 1));
+    }
+
+    @Test
+    public void getIssueDateShouldReturnIssueDateWithFutureToken() {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        long issueDate = NOW + 1;
+        AbstractDelegationTokenIdentifier tokenIdentifier =
+                new TestDelegationTokenIdentifier(issueDate);
+
+        assertEquals(
+                issueDate,
+                provider.getIssueDate(
+                        constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+    }
+
+    @Test
+    public void getIssueDateShouldReturnIssueDateWithPastToken() {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        long issueDate = NOW - 1;
+        AbstractDelegationTokenIdentifier tokenIdentifier =
+                new TestDelegationTokenIdentifier(issueDate);
+
+        assertEquals(
+                issueDate,
+                provider.getIssueDate(
+                        constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+    }
+
+    @Test
+    public void getIssueDateShouldReturnNowWithInvalidToken() {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        long issueDate = -1;
+        AbstractDelegationTokenIdentifier tokenIdentifier =
+                new TestDelegationTokenIdentifier(issueDate);
+
+        assertEquals(
+                NOW,
+                provider.getIssueDate(
+                        constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
index fb35133bb19..1cac17dc221 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
@@ -101,7 +101,8 @@ public class KerberosDelegationTokenManagerITCase {
         KerberosDelegationTokenManager delegationTokenManager =
                 new KerberosDelegationTokenManager(configuration, null, null);
 
-        assertEquals(1, delegationTokenManager.delegationTokenProviders.size());
+        assertEquals(2, delegationTokenManager.delegationTokenProviders.size());
+        assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
         assertTrue(delegationTokenManager.isProviderLoaded("test"));
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
new file mode 100644
index 00000000000..0d041d2b2a3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Example test implementation of {@link AbstractDelegationTokenIdentifier} which is used in
+ * integration tests.
+ */
+public class TestDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
+
+    private static final Text tokenKind = new Text("TEST_TOKEN_KIND");
+
+    private long issueDate;
+
+    // This is needed for service loader
+    public TestDelegationTokenIdentifier() {}
+
+    public TestDelegationTokenIdentifier(long issueDate) {
+        this.issueDate = issueDate;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeLong(issueDate);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        issueDate = in.readLong();
+    }
+
+    @Override
+    public Text getKind() {
+        return tokenKind;
+    }
+
+    @Override
+    public long getIssueDate() {
+        return issueDate;
+    }
+}
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
new file mode 100644
index 00000000000..59e6d84d595
--- /dev/null
+++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.TestDelegationTokenIdentifier
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 5cba3a233bd..04b35098e8b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1294,7 +1294,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
     private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception {
         LOG.info("Adding delegation tokens to the AM container.");
 
-        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+        Credentials credentials = new Credentials();
 
         DelegationTokenManager delegationTokenManager =
                 new KerberosDelegationTokenManager(flinkConfiguration, null, null);