You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/07/05 11:35:34 UTC
[flink] branch master updated: [FLINK-25908][runtime][security] Add HadoopFSDelegationTokenProvider
This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a89152713aa [FLINK-25908][runtime][security] Add HadoopFSDelegationTokenProvider
a89152713aa is described below
commit a89152713aa58647841c46ed2335b45d24c553f9
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Tue Jul 5 13:35:27 2022 +0200
[FLINK-25908][runtime][security] Add HadoopFSDelegationTokenProvider
---
.../security/token/DelegationTokenProvider.java | 6 +-
.../token/HadoopFSDelegationTokenProvider.java | 277 +++++++++++++++++++++
.../token/KerberosDelegationTokenManager.java | 45 ++--
....runtime.security.token.DelegationTokenProvider | 16 ++
.../HadoopFSDelegationTokenProviderITCase.java | 194 +++++++++++++++
.../KerberosDelegationTokenManagerITCase.java | 3 +-
.../token/TestDelegationTokenIdentifier.java | 64 +++++
...rg.apache.hadoop.security.token.TokenIdentifier | 16 ++
.../apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
9 files changed, 601 insertions(+), 22 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
index ae434fbd4b1..39764133d35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
@@ -39,14 +39,14 @@ public interface DelegationTokenProvider {
*
* @param configuration Configuration to initialize the provider.
*/
- void init(Configuration configuration);
+ void init(Configuration configuration) throws Exception;
/**
* Return whether delegation tokens are required for this service.
*
* @return true if delegation tokens are required.
*/
- boolean delegationTokensRequired();
+ boolean delegationTokensRequired() throws Exception;
/**
* Obtain delegation tokens for this service.
@@ -55,5 +55,5 @@ public interface DelegationTokenProvider {
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise `Optional.empty()` should be returned.
*/
- Optional<Long> obtainDelegationTokens(Credentials credentials);
+ Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
new file mode 100644
index 00000000000..a3033a15a87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** Delegation token provider for Hadoop filesystems. */
+@Experimental
+public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
+
+ private Configuration flinkConfiguration;
+
+ private org.apache.hadoop.conf.Configuration hadoopConfiguration;
+
+ private Optional<Long> tokenRenewalInterval;
+
+ @Override
+ public String serviceName() {
+ return "hadoopfs";
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {
+ flinkConfiguration = configuration;
+ hadoopConfiguration = HadoopUtils.getHadoopConfiguration(configuration);
+ }
+
+ @Override
+ public boolean delegationTokensRequired() throws Exception {
+ return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser());
+ }
+
+ @Override
+ public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+ Clock clock = Clock.systemDefaultZone();
+ Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();
+
+ obtainDelegationTokens(null, fileSystemsToAccess, credentials);
+
+ // Get the token renewal interval if it is not set. It will be called only once.
+ if (tokenRenewalInterval == null) {
+ tokenRenewalInterval = getTokenRenewalInterval(clock, fileSystemsToAccess);
+ }
+ return tokenRenewalInterval.flatMap(
+ interval -> getTokenRenewalDate(clock, credentials, interval));
+ }
+
+ private Set<FileSystem> getFileSystemsToAccess() throws IOException {
+ Set<FileSystem> result = new HashSet<>();
+
+ // Default filesystem
+ FileSystem defaultFileSystem = FileSystem.get(hadoopConfiguration);
+ LOG.debug(
+ "Adding Hadoop default filesystem to file systems to access {}", defaultFileSystem);
+ result.add(defaultFileSystem);
+ LOG.debug("Hadoop default filesystem added to file systems to access successfully");
+
+ // Additional filesystems
+ ConfigUtils.decodeListFromConfig(
+ flinkConfiguration,
+ SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS,
+ Path::new)
+ .forEach(
+ path -> {
+ try {
+ LOG.debug(
+ "Adding path's filesystem to file systems to access {}",
+ path);
+ result.add(path.getFileSystem(hadoopConfiguration));
+ LOG.debug(
+ "Path's filesystem added to file systems to access successfully");
+ } catch (IOException e) {
+ LOG.error("Failed to get filesystem for {}", path, e);
+ throw new FlinkRuntimeException(e);
+ }
+ });
+
+ // YARN staging dir
+ if (flinkConfiguration.getString(DeploymentOptions.TARGET).toLowerCase().contains("yarn")) {
+ LOG.debug("Running on YARN, trying to add staging directory to file systems to access");
+ String yarnStagingDirectory =
+ flinkConfiguration.getString("yarn.staging-directory", "");
+ if (!StringUtils.isBlank(yarnStagingDirectory)) {
+ LOG.debug(
+ "Adding staging directory to file systems to access {}",
+ yarnStagingDirectory);
+ result.add(new Path(yarnStagingDirectory).getFileSystem(hadoopConfiguration));
+ LOG.debug("Staging directory added to file systems to access successfully");
+ } else {
+ LOG.debug(
+ "Staging directory is not set or empty so not added to file systems to access");
+ }
+ }
+
+ return result;
+ }
+
+ protected void obtainDelegationTokens(
+ @Nullable String renewer,
+ Set<FileSystem> fileSystemsToAccess,
+ Credentials credentials) {
+ fileSystemsToAccess.forEach(
+ fs -> {
+ try {
+ LOG.debug("Obtaining delegation token for {} with renewer {}", fs, renewer);
+ fs.addDelegationTokens(renewer, credentials);
+ LOG.debug("Delegation obtained successfully");
+ } catch (Exception e) {
+ LOG.error("Failed to obtain delegation token for {}", fs, e);
+ throw new FlinkRuntimeException(e);
+ }
+ });
+ }
+
+ Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> fileSystemsToAccess)
+ throws IOException {
+ // We cannot use the tokens generated with renewer yarn
+ // Trying to renew those will fail with an access control issue
+ // So create new tokens with the logged in user as renewer
+ String renewer = UserGroupInformation.getCurrentUser().getUserName();
+
+ Credentials credentials = new Credentials();
+ obtainDelegationTokens(renewer, fileSystemsToAccess, credentials);
+
+ Optional<Long> result =
+ credentials.getAllTokens().stream()
+ .filter(
+ t -> {
+ try {
+ return t.decodeIdentifier()
+ instanceof AbstractDelegationTokenIdentifier;
+ } catch (IOException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .map(
+ t -> {
+ try {
+ long newExpiration = t.renew(hadoopConfiguration);
+ AbstractDelegationTokenIdentifier identifier =
+ (AbstractDelegationTokenIdentifier)
+ t.decodeIdentifier();
+ String tokenKind = t.getKind().toString();
+ long interval =
+ newExpiration
+ - getIssueDate(
+ clock, tokenKind, identifier);
+ LOG.debug(
+ "Renewal interval is {} for token {}",
+ interval,
+ tokenKind);
+ return interval;
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .min(Long::compare);
+
+ LOG.debug("Global renewal interval is {}", result);
+
+ return result;
+ }
+
+ @VisibleForTesting
+ Optional<Long> getTokenRenewalDate(Clock clock, Credentials credentials, long renewalInterval) {
+ if (renewalInterval < 0) {
+ LOG.debug("Negative renewal interval so no renewal date is calculated");
+ return Optional.empty();
+ }
+
+ Optional<Long> result =
+ credentials.getAllTokens().stream()
+ .filter(
+ t -> {
+ try {
+ return t.decodeIdentifier()
+ instanceof AbstractDelegationTokenIdentifier;
+ } catch (IOException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .map(
+ t -> {
+ try {
+ AbstractDelegationTokenIdentifier identifier =
+ (AbstractDelegationTokenIdentifier)
+ t.decodeIdentifier();
+ String tokenKind = t.getKind().toString();
+ long date =
+ getIssueDate(clock, tokenKind, identifier)
+ + renewalInterval;
+ LOG.debug(
+ "Renewal date is {} for token {}", date, tokenKind);
+ return date;
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .min(Long::compare);
+
+ LOG.debug("Global renewal date is {}", result);
+
+ return result;
+ }
+
+ @VisibleForTesting
+ long getIssueDate(Clock clock, String tokenKind, AbstractDelegationTokenIdentifier identifier) {
+ long now = clock.millis();
+ long issueDate = identifier.getIssueDate();
+
+ if (issueDate > now) {
+ LOG.warn(
+ "Token {} has set up issue date later than current time. (provided: "
+ + "{} / current timestamp: {}) Please make sure clocks are in sync between "
+ + "machines. If the issue is not a clock mismatch, consult token implementor to check "
+ + "whether issue date is valid.",
+ tokenKind,
+ issueDate,
+ now);
+ return issueDate;
+ } else if (issueDate > 0) {
+ return issueDate;
+ } else {
+ LOG.warn(
+ "Token {} has not set up issue date properly. (provided: {}) "
+ + "Using current timestamp ({}) as issue date instead. Consult token implementor to fix "
+ + "the behavior.",
+ tokenKind,
+ issueDate,
+ now);
+ return now;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
index 5a755ba91ac..1682e7d9827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.security.token;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.hadoop.security.Credentials;
@@ -130,12 +131,14 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
"Delegation token provider {} is disabled so not loaded",
provider.serviceName());
}
- } catch (Exception e) {
+ } catch (Exception | NoClassDefFoundError e) {
LOG.error(
- "Failed to initialize delegation token provider {}.",
+ "Failed to initialize delegation token provider {}",
provider.serviceName(),
e);
- throw e;
+ if (!(e instanceof NoClassDefFoundError)) {
+ throw new FlinkRuntimeException(e);
+ }
}
}
@@ -183,21 +186,29 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
delegationTokenProviders.values().stream()
.map(
provider -> {
- Optional<Long> nr = Optional.empty();
- if (provider.delegationTokensRequired()) {
- LOG.debug(
- "Obtaining delegation token for service {}",
- provider.serviceName());
- nr = provider.obtainDelegationTokens(credentials);
- LOG.debug(
- "Obtained delegation token for service {} successfully",
- provider.serviceName());
- } else {
- LOG.debug(
- "Service {} does not need to obtain delegation token",
- provider.serviceName());
+ try {
+ Optional<Long> nr = Optional.empty();
+ if (provider.delegationTokensRequired()) {
+ LOG.debug(
+ "Obtaining delegation token for service {}",
+ provider.serviceName());
+ nr = provider.obtainDelegationTokens(credentials);
+ LOG.debug(
+ "Obtained delegation token for service {} successfully",
+ provider.serviceName());
+ } else {
+ LOG.debug(
+ "Service {} does not need to obtain delegation token",
+ provider.serviceName());
+ }
+ return nr;
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to obtain delegation token for provider {}",
+ provider.serviceName(),
+ e);
+ throw new FlinkRuntimeException(e);
}
- return nr;
})
.flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty))
.min(Long::compare);
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
new file mode 100644
index 00000000000..56619ee46d5
--- /dev/null
+++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
new file mode 100644
index 00000000000..0708069510b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for {@link HadoopFSDelegationTokenProvider}. */
+class HadoopFSDelegationTokenProviderITCase {
+
+ private static final long NOW = 100;
+
+ private static final String masterPrincipal = "MasterPrincipal";
+
+ final Text tokenService1 = new Text("TEST_TOKEN_SERVICE1");
+ final Text tokenService2 = new Text("TEST_TOKEN_SERVICE2");
+
+ private class TestDelegationToken extends Token<TestDelegationTokenIdentifier> {
+
+ private long newExpiration;
+
+ public TestDelegationToken(
+ Text tokenService, TestDelegationTokenIdentifier identifier, long newExpiration) {
+ super(identifier.getBytes(), new byte[4], identifier.getKind(), tokenService);
+ this.newExpiration = newExpiration;
+ }
+
+ public TestDelegationToken(Text tokenService, TestDelegationTokenIdentifier identifier) {
+ this(tokenService, identifier, 0L);
+ }
+
+ @Override
+ public long renew(Configuration conf) {
+ return newExpiration;
+ }
+ }
+
+ @Test
+ public void getTokenRenewalIntervalShouldReturnNoneWhenNoTokens() throws IOException {
+ HadoopFSDelegationTokenProvider provider =
+ new HadoopFSDelegationTokenProvider() {
+ @Override
+ protected void obtainDelegationTokens(
+ String renewer,
+ Set<FileSystem> fileSystemsToAccess,
+ Credentials credentials) {}
+ };
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+
+ assertEquals(
+ Optional.empty(),
+ provider.getTokenRenewalInterval(constantClock, Collections.emptySet()));
+ }
+
+ @Test
+ public void getTokenRenewalIntervalShouldReturnMinWhenMultipleTokens() throws IOException {
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ HadoopFSDelegationTokenProvider provider =
+ new HadoopFSDelegationTokenProvider() {
+ @Override
+ protected void obtainDelegationTokens(
+ String renewer,
+ Set<FileSystem> fileSystemsToAccess,
+ Credentials credentials) {
+ TestDelegationTokenIdentifier tokenIdentifier1 =
+ new TestDelegationTokenIdentifier(NOW);
+ credentials.addToken(
+ tokenService1,
+ new TestDelegationToken(tokenService1, tokenIdentifier1, NOW + 1));
+
+ TestDelegationTokenIdentifier tokenIdentifier2 =
+ new TestDelegationTokenIdentifier(NOW);
+ credentials.addToken(
+ tokenService2,
+ new TestDelegationToken(tokenService2, tokenIdentifier2, NOW + 2));
+ }
+ };
+
+ assertEquals(
+ Optional.of(1L),
+ provider.getTokenRenewalInterval(constantClock, Collections.emptySet()));
+ }
+
+ @Test
+ public void getTokenRenewalDateShouldReturnNoneWhenNegativeRenewalInterval() {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+ Credentials credentials = new Credentials();
+
+ assertEquals(
+ Optional.empty(), provider.getTokenRenewalDate(constantClock, credentials, -1));
+ }
+
+ @Test
+ public void getTokenRenewalDateShouldReturnNoneWhenNoTokens() {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+ Credentials credentials = new Credentials();
+
+ assertEquals(Optional.empty(), provider.getTokenRenewalDate(constantClock, credentials, 1));
+ }
+
+ @Test
+ public void getTokenRenewalDateShouldReturnMinWhenMultipleTokens() {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ Credentials credentials = new Credentials();
+ TestDelegationTokenIdentifier tokenIdentifier1 = new TestDelegationTokenIdentifier(NOW);
+ credentials.addToken(
+ tokenService1, new TestDelegationToken(tokenService1, tokenIdentifier1));
+ TestDelegationTokenIdentifier tokenIdentifier2 = new TestDelegationTokenIdentifier(NOW + 1);
+ credentials.addToken(
+ tokenService2, new TestDelegationToken(tokenService2, tokenIdentifier2));
+
+ assertEquals(
+ Optional.of(NOW + 1), provider.getTokenRenewalDate(constantClock, credentials, 1));
+ }
+
+ @Test
+ public void getIssueDateShouldReturnIssueDateWithFutureToken() {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ long issueDate = NOW + 1;
+ AbstractDelegationTokenIdentifier tokenIdentifier =
+ new TestDelegationTokenIdentifier(issueDate);
+
+ assertEquals(
+ issueDate,
+ provider.getIssueDate(
+ constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+ }
+
+ @Test
+ public void getIssueDateShouldReturnIssueDateWithPastToken() {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ long issueDate = NOW - 1;
+ AbstractDelegationTokenIdentifier tokenIdentifier =
+ new TestDelegationTokenIdentifier(issueDate);
+
+ assertEquals(
+ issueDate,
+ provider.getIssueDate(
+ constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+ }
+
+ @Test
+ public void getIssueDateShouldReturnNowWithInvalidToken() {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ long issueDate = -1;
+ AbstractDelegationTokenIdentifier tokenIdentifier =
+ new TestDelegationTokenIdentifier(issueDate);
+
+ assertEquals(
+ NOW,
+ provider.getIssueDate(
+ constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
index fb35133bb19..1cac17dc221 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
@@ -101,7 +101,8 @@ public class KerberosDelegationTokenManagerITCase {
KerberosDelegationTokenManager delegationTokenManager =
new KerberosDelegationTokenManager(configuration, null, null);
- assertEquals(1, delegationTokenManager.delegationTokenProviders.size());
+ assertEquals(2, delegationTokenManager.delegationTokenProviders.size());
+ assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
assertTrue(delegationTokenManager.isProviderLoaded("test"));
assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
assertFalse(delegationTokenManager.isProviderLoaded("throw"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
new file mode 100644
index 00000000000..0d041d2b2a3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Example test implementation of {@link AbstractDelegationTokenIdentifier} which is used in
+ * integration tests.
+ */
+public class TestDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
+
+ private static final Text tokenKind = new Text("TEST_TOKEN_KIND");
+
+ private long issueDate;
+
+ // This is needed for service loader
+ public TestDelegationTokenIdentifier() {}
+
+ public TestDelegationTokenIdentifier(long issueDate) {
+ this.issueDate = issueDate;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(issueDate);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ issueDate = in.readLong();
+ }
+
+ @Override
+ public Text getKind() {
+ return tokenKind;
+ }
+
+ @Override
+ public long getIssueDate() {
+ return issueDate;
+ }
+}
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
new file mode 100644
index 00000000000..59e6d84d595
--- /dev/null
+++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.TestDelegationTokenIdentifier
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 5cba3a233bd..04b35098e8b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1294,7 +1294,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception {
LOG.info("Adding delegation tokens to the AM container.");
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ Credentials credentials = new Credentials();
DelegationTokenManager delegationTokenManager =
new KerberosDelegationTokenManager(flinkConfiguration, null, null);