You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/07/21 13:00:29 UTC

[flink] branch master updated: [FLINK-28608][runtime][security]Make Hadoop FS token renewer configurable

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c5108cb42f [FLINK-28608][runtime][security]Make Hadoop FS token renewer configurable
0c5108cb42f is described below

commit 0c5108cb42fe9d891bfd9137fc79fb4d8653a51b
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Jul 21 15:00:17 2022 +0200

    [FLINK-28608][runtime][security]Make Hadoop FS token renewer configurable
---
 .../token/HadoopFSDelegationTokenProvider.java        | 10 +++++++++-
 .../token/HadoopFSDelegationTokenProviderITCase.java  | 19 +++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
index a3033a15a87..edf5e7be8c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
@@ -78,7 +78,7 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider
         Clock clock = Clock.systemDefaultZone();
         Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();
 
-        obtainDelegationTokens(null, fileSystemsToAccess, credentials);
+        obtainDelegationTokens(getRenewer(), fileSystemsToAccess, credentials);
 
         // Get the token renewal interval if it is not set. It will be called only once.
         if (tokenRenewalInterval == null) {
@@ -88,6 +88,13 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider
                 interval -> getTokenRenewalDate(clock, credentials, interval));
     }
 
+    @VisibleForTesting
+    @Nullable
+    String getRenewer() {
+        return flinkConfiguration.getString(
+                String.format("security.kerberos.token.provider.%s.renewer", serviceName()), null);
+    }
+
     private Set<FileSystem> getFileSystemsToAccess() throws IOException {
         Set<FileSystem> result = new HashSet<>();
 
@@ -155,6 +162,7 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider
                 });
     }
 
+    @VisibleForTesting
     Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> fileSystemsToAccess)
             throws IOException {
         // We cannot use the tokens generated with renewer yarn
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
index 0708069510b..f5e2db75100 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
@@ -35,6 +35,7 @@ import java.util.Set;
 
 import static java.time.Instant.ofEpochMilli;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /** Test for {@link HadoopFSDelegationTokenProvider}. */
 class HadoopFSDelegationTokenProviderITCase {
@@ -66,6 +67,24 @@ class HadoopFSDelegationTokenProviderITCase {
         }
     }
 
+    @Test
+    public void getRenewerShouldReturnNullByDefault() throws Exception {
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+        provider.init(new org.apache.flink.configuration.Configuration());
+        assertNull(provider.getRenewer());
+    }
+
+    @Test
+    public void getRenewerShouldReturnConfiguredRenewer() throws Exception {
+        String renewer = "testRenewer";
+        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
+        org.apache.flink.configuration.Configuration configuration =
+                new org.apache.flink.configuration.Configuration();
+        configuration.setString("security.kerberos.token.provider.hadoopfs.renewer", renewer);
+        provider.init(configuration);
+        assertEquals(renewer, provider.getRenewer());
+    }
+
     @Test
     public void getTokenRenewalIntervalShouldReturnNoneWhenNoTokens() throws IOException {
         HadoopFSDelegationTokenProvider provider =