You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/26 12:06:45 UTC

[GitHub] [flink] gaborgsomogyi opened a new pull request, #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

gaborgsomogyi opened a new pull request, #19825:
URL: https://github.com/apache/flink/pull/19825

   ## What is the purpose of the change
   
   Kerberos delegation token framework is now not able to obtain and periodically re-obtain tokens from token providers. In this PR I've added such functionality.
   
   ## Brief change log
   
   * Added `security.kerberos.tokens.renewal-ratio` with default 0.75
   * Added `security.kerberos.tokens.retry-wait` with default 1 hour
   * Added one-time delegation token obtain possibility in `obtainDelegationTokens`
   * Added re-occurring delegation token obtain possibility in `start`
   * Minor renames/fixes here and there
   
   ## Verifying this change
   
   * Additional unit tests
   * Manually on YARN and K8S (please see an example [here](https://gist.github.com/gaborgsomogyi/ac4f71ead8494da2f5c35265bcb1e885))
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? All documentation is intended to be added in [FLINK-25911](https://issues.apache.org/jira/browse/FLINK-25911) when everything works as a whole
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893245619


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -127,8 +170,62 @@ boolean isProviderLoaded(String serviceName) {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected long obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mbalassi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
mbalassi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1138539150

   Thanks, @gaborgsomogyi. I glanced through the log, looks good at first sight. I will be able to give you detailed feedback early next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893378721


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -127,8 +170,62 @@ boolean isProviderLoaded(String serviceName) {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected long obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
+        AtomicLong nextRenewal = new AtomicLong(Long.MAX_VALUE);
+
+        delegationTokenProviders
+                .values()
+                .forEach(

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1151508412

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1138492782

   Here is an example run log from K8S.
   [token.log](https://github.com/apache/flink/files/8778806/token.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1138548677

   @mbalassi sure, thanks in advance! This is a super hairy topic so it takes time to understand all the aspects.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893239238


##########
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java:
##########
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RETRY_WAIT =
+            key("security.kerberos.tokens.retry-wait")

Review Comment:
   Renamed to `security.kerberos.tokens.retry.backoff`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893246821


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -127,8 +170,62 @@ boolean isProviderLoaded(String serviceName) {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected long obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
+        AtomicLong nextRenewal = new AtomicLong(Long.MAX_VALUE);
+
+        delegationTokenProviders
+                .values()
+                .forEach(

Review Comment:
   I've tried this in the initial implementation but it was just not working. No idea what was the reason so re-checking...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r894288624


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -173,27 +274,95 @@ void startTGTRenewal() throws IOException {
                                                     LOG.warn("Error while renewing TGT", e);
                                                 }
                                             }),
-                            tgtRenewalPeriod,
+                            0,
                             tgtRenewalPeriod,
                             TimeUnit.MILLISECONDS);
-            LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+            LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
         } else {
-            LOG.debug("TGT renewal task not started");
+            LOG.info("TGT renewal task not started");
         }
     }
 
+    @VisibleForTesting
     void stopTGTRenewal() {
         if (tgtRenewalFuture != null) {
             tgtRenewalFuture.cancel(true);
             tgtRenewalFuture = null;
         }
     }
 
+    @VisibleForTesting
+    void startTokensUpdate() {
+        try {
+            LOG.info("Starting tokens update task");

Review Comment:
   I agree if they contain the same check/logic we should keep only the `kerberosLoginProvider`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1152192787

   I've made a rebase to the latest master because there are several unrelated issues in the jenkins job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1151335158

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1152884441

   FLINK-27792 issue occurs in the jenkins jobs permanently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1138493119

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "763380b0c5c0a55fcda39a4211f6bd8f56c706c7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "763380b0c5c0a55fcda39a4211f6bd8f56c706c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 763380b0c5c0a55fcda39a4211f6bd8f56c706c7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1140839121

   @JackWangCS FYI, at top of this this PR you can implement and test FLINK-25909.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1152982014

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r894241922


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -173,27 +274,95 @@ void startTGTRenewal() throws IOException {
                                                     LOG.warn("Error while renewing TGT", e);
                                                 }
                                             }),
-                            tgtRenewalPeriod,
+                            0,
                             tgtRenewalPeriod,
                             TimeUnit.MILLISECONDS);
-            LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+            LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
         } else {
-            LOG.debug("TGT renewal task not started");
+            LOG.info("TGT renewal task not started");
         }
     }
 
+    @VisibleForTesting
     void stopTGTRenewal() {
         if (tgtRenewalFuture != null) {
             tgtRenewalFuture.cancel(true);
             tgtRenewalFuture = null;
         }
     }
 
+    @VisibleForTesting
+    void startTokensUpdate() {
+        try {
+            LOG.info("Starting tokens update task");

Review Comment:
   Should we check `isLoginPossible` before starting this logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r894259363


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -173,27 +274,95 @@ void startTGTRenewal() throws IOException {
                                                     LOG.warn("Error while renewing TGT", e);
                                                 }
                                             }),
-                            tgtRenewalPeriod,
+                            0,
                             tgtRenewalPeriod,
                             TimeUnit.MILLISECONDS);
-            LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+            LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
         } else {
-            LOG.debug("TGT renewal task not started");
+            LOG.info("TGT renewal task not started");
         }
     }
 
+    @VisibleForTesting
     void stopTGTRenewal() {
         if (tgtRenewalFuture != null) {
             tgtRenewalFuture.cancel(true);
             tgtRenewalFuture = null;
         }
     }
 
+    @VisibleForTesting
+    void startTokensUpdate() {
+        try {
+            LOG.info("Starting tokens update task");

Review Comment:
   This is an internal function and not intended to use from the outside.
   In `start` function there is a check:
   ```
           if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
               LOG.info("Renewal is NOT possible, skipping to start renewal task");
               return;
           }
   ```
   What started to bother me though is that `kerberosRenewalPossibleProvider.isRenewalPossible` contains more or less the same code just like `kerberosLoginProvider.isLoginPossible`.
   
   I think debug messages must be added to `kerberosLoginProvider.isLoginPossible` and `kerberosRenewalPossibleProvider` can be deleted as-is. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1141040755

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893220794


##########
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java:
##########
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RETRY_WAIT =
+            key("security.kerberos.tokens.retry-wait")

Review Comment:
   I think a better name would be `security.kerberos.tokens.obtain.retry.backoff` or `security.kerberos.tokens.retry.backoff`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -127,8 +170,62 @@ boolean isProviderLoaded(String serviceName) {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected long obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
+        AtomicLong nextRenewal = new AtomicLong(Long.MAX_VALUE);
+
+        delegationTokenProviders
+                .values()
+                .forEach(

Review Comment:
   You could use a stream, flatmap -> renewal time, and min to get rid of the AtomicLong based min logic



##########
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java:
##########
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RETRY_WAIT =
+            key("security.kerberos.tokens.retry-wait")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "The time period how long to wait before retrying to obtain new delegation tokens after a failure.");
+
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Double> KERBEROS_TOKENS_RENEWAL_RATIO =
+            key("security.kerberos.tokens.renewal-ratio")

Review Comment:
   A clearer name would be `security.kerberos.tokens.renewal.time-ratio`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -127,8 +170,62 @@ boolean isProviderLoaded(String serviceName) {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected long obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {

Review Comment:
   Could return Optional<Long> instead to make this more explicit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893249510


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -127,8 +170,62 @@ boolean isProviderLoaded(String serviceName) {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(Credentials credentials) throws Exception {
         LOG.info("Obtaining delegation tokens");
+
+        // Delegation tokens can only be obtained if the real user has Kerberos credentials, so
+        // skip creation when those are not available.
+        if (kerberosLoginProvider.isLoginPossible()) {
+            UserGroupInformation freshUGI = kerberosLoginProvider.doLogin();
+            freshUGI.doAs(
+                    (PrivilegedExceptionAction<Void>)
+                            () -> {
+                                obtainDelegationTokensAndGetNextRenewal(credentials);
+                                return null;
+                            });
+            LOG.info("Delegation tokens obtained successfully");
+        } else {
+            LOG.info("Real user has no kerberos credentials so no tokens obtained");
+        }
+    }
+
+    protected long obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
+        AtomicLong nextRenewal = new AtomicLong(Long.MAX_VALUE);
+
+        delegationTokenProviders
+                .values()
+                .forEach(

Review Comment:
   If you paste the code here we can probably spot the problem together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r894292459


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -173,27 +274,95 @@ void startTGTRenewal() throws IOException {
                                                     LOG.warn("Error while renewing TGT", e);
                                                 }
                                             }),
-                            tgtRenewalPeriod,
+                            0,
                             tgtRenewalPeriod,
                             TimeUnit.MILLISECONDS);
-            LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+            LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
         } else {
-            LOG.debug("TGT renewal task not started");
+            LOG.info("TGT renewal task not started");
         }
     }
 
+    @VisibleForTesting
     void stopTGTRenewal() {
         if (tgtRenewalFuture != null) {
             tgtRenewalFuture.cancel(true);
             tgtRenewalFuture = null;
         }
     }
 
+    @VisibleForTesting
+    void startTokensUpdate() {
+        try {
+            LOG.info("Starting tokens update task");

Review Comment:
   looks good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1153206321

   In the meantime found a real unit test bug which was added. `HadoopReduceFunctionITCase` because of dependeny problem. Namely `HdfsConfiguration` was needed in core because `isProxyUser` was added to a class in `flink-hadoop-fs`. Now only the originally planned `hadoop-common` is needed as expected because I've moved isProxyUser` to `HadoopUserUtils`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893241050


##########
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java:
##########
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RETRY_WAIT =
+            key("security.kerberos.tokens.retry-wait")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "The time period how long to wait before retrying to obtain new delegation tokens after a failure.");
+
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Double> KERBEROS_TOKENS_RENEWAL_RATIO =
+            key("security.kerberos.tokens.renewal-ratio")

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893419986


##########
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java:
##########
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RETRY_BACKOFF =
+            key("security.kerberos.tokens.retry.backoff")

Review Comment:
   sorry just to connect the 2 configs maybe even better would be `security.kerberos.tokens.renewal.retry.backoff` so they have a common prefix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r893454486


##########
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java:
##########
@@ -132,6 +132,22 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens automatically in order to always have a valid TGT.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_TOKENS_RETRY_BACKOFF =
+            key("security.kerberos.tokens.retry.backoff")

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora merged pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #19825:
URL: https://github.com/apache/flink/pull/19825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1152884459

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1153324680

   > @JackWangCS FYI, at top of this this PR you can implement and test [FLINK-25909](https://issues.apache.org/jira/browse/FLINK-25909).
   
   Thanks, I will start working on the ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19825:
URL: https://github.com/apache/flink/pull/19825#discussion_r894270294


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -173,27 +274,95 @@ void startTGTRenewal() throws IOException {
                                                     LOG.warn("Error while renewing TGT", e);
                                                 }
                                             }),
-                            tgtRenewalPeriod,
+                            0,
                             tgtRenewalPeriod,
                             TimeUnit.MILLISECONDS);
-            LOG.debug("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
+            LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
         } else {
-            LOG.debug("TGT renewal task not started");
+            LOG.info("TGT renewal task not started");
         }
     }
 
+    @VisibleForTesting
     void stopTGTRenewal() {
         if (tgtRenewalFuture != null) {
             tgtRenewalFuture.cancel(true);
             tgtRenewalFuture = null;
         }
     }
 
+    @VisibleForTesting
+    void startTokensUpdate() {
+        try {
+            LOG.info("Starting tokens update task");

Review Comment:
   I've made the changes, plz share your opinion if you have better idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1152972312

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1152406301

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1138489973

   cc @mbalassi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1138838423

   After a minor fix here is the TGT based log.
   [token_tgt.log](https://github.com/apache/flink/files/8780948/token_tgt.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19825: [FLINK-27171][runtime][security] Add periodic kerberos delegation token obtain possibility to DelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19825:
URL: https://github.com/apache/flink/pull/19825#issuecomment-1139506508

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org