You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/07/21 13:00:29 UTC
[flink] branch master updated: [FLINK-28608][runtime][security]Make Hadoop FS token renewer configurable
This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0c5108cb42f [FLINK-28608][runtime][security]Make Hadoop FS token renewer configurable
0c5108cb42f is described below
commit 0c5108cb42fe9d891bfd9137fc79fb4d8653a51b
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Jul 21 15:00:17 2022 +0200
[FLINK-28608][runtime][security]Make Hadoop FS token renewer configurable
---
.../token/HadoopFSDelegationTokenProvider.java | 10 +++++++++-
.../token/HadoopFSDelegationTokenProviderITCase.java | 19 +++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
index a3033a15a87..edf5e7be8c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
@@ -78,7 +78,7 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider
Clock clock = Clock.systemDefaultZone();
Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();
- obtainDelegationTokens(null, fileSystemsToAccess, credentials);
+ obtainDelegationTokens(getRenewer(), fileSystemsToAccess, credentials);
// Get the token renewal interval if it is not set. It will be called only once.
if (tokenRenewalInterval == null) {
@@ -88,6 +88,13 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider
interval -> getTokenRenewalDate(clock, credentials, interval));
}
+ @VisibleForTesting
+ @Nullable
+ String getRenewer() {
+ return flinkConfiguration.getString(
+ String.format("security.kerberos.token.provider.%s.renewer", serviceName()), null);
+ }
+
private Set<FileSystem> getFileSystemsToAccess() throws IOException {
Set<FileSystem> result = new HashSet<>();
@@ -155,6 +162,7 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider
});
}
+ @VisibleForTesting
Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> fileSystemsToAccess)
throws IOException {
// We cannot use the tokens generated with renewer yarn
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
index 0708069510b..f5e2db75100 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
@@ -35,6 +35,7 @@ import java.util.Set;
import static java.time.Instant.ofEpochMilli;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
/** Test for {@link HadoopFSDelegationTokenProvider}. */
class HadoopFSDelegationTokenProviderITCase {
@@ -66,6 +67,24 @@ class HadoopFSDelegationTokenProviderITCase {
}
}
+ @Test
+ public void getRenewerShouldReturnNullByDefault() throws Exception {
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+ provider.init(new org.apache.flink.configuration.Configuration());
+ assertNull(provider.getRenewer());
+ }
+
+ @Test
+ public void getRenewerShouldReturnConfiguredRenewer() throws Exception {
+ String renewer = "testRenewer";
+ HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+ org.apache.flink.configuration.Configuration configuration =
+ new org.apache.flink.configuration.Configuration();
+ configuration.setString("security.kerberos.token.provider.hadoopfs.renewer", renewer);
+ provider.init(configuration);
+ assertEquals(renewer, provider.getRenewer());
+ }
+
@Test
public void getTokenRenewalIntervalShouldReturnNoneWhenNoTokens() throws IOException {
HadoopFSDelegationTokenProvider provider =