You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 08:02:19 UTC

[GitHub] [flink] gaborgsomogyi opened a new pull request, #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

gaborgsomogyi opened a new pull request, #19372:
URL: https://github.com/apache/flink/pull/19372

   ## What is the purpose of the change
   
   `KerberosDelegationTokenManager` needs a valid TGT in order to obtain delegation tokens. In this PR I've added periodic kerberos relogin when configured properly.
   
   ## Brief change log
   
   * Added `security.kerberos.relogin.period` config with default 1 minute.
   * Added TGT renew thread when user uses keytab.
   
   ## Verifying this change
   
   * Additional unit tests
   * Manually on YARN cluster
   * Please be aware K8S is not tested
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? All documentation is intended to be added in [FLINK-25911](https://issues.apache.org/jira/browse/FLINK-25911) when everything works as a whole
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849582652


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;
+
+    private ScheduledFuture<?> tgtRenewalFuture;
+
+    public KerberosDelegationTokenManager(
+            Configuration configuration,
+            @Nullable ScheduledExecutor scheduledExecutor,
+            @Nullable ExecutorService executorService) {

Review Comment:
   There are basically 2 use-cases for DT creation:
   * Single obtain
   * DTM start where auto obtain + propagation happens in a re-occuring way
   
   These 2 use-cases require different internals. So all in all both belong to the interface.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r854030684


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   @dmvk did you have a chance to consider the situation in-depth? I've my own suggestion which may or may not intersect your opinion. Namely if it's not too horror complex upgrading to mockito 3.4.0 and choosing bullet point 1 is my preference. Though no idea why mockito is super old, maybe there was no agreement to upgrade that?! If you have some insights please share.
   
   If that would be an overkill then I would vote on bullet point 3 because all the other options would add hard to maintain and brittle solutions.
   
   WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1104866261

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1098003872

   I've squashed everything into a single commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849691193


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   I've had a deeper look and let's summarize my findings:
   * No mocking framework allowed
   * `UserGroupInformation` class has no public constructor so instance creation is possible with reflection only which I'm pretty sure won't initialize the instance properly. As a result all places where `UserGroupInformation` is used need to be hacked around.
   * If we go to the `KerberosClient` direction we would see functions like `KerberosClient.hasCurrentUserKerberosCredentials()` instead of `UserGroupInformation.getCurrentUser().hasKerberosCredentials()` which is hacky but doable
   * But as soon as the condition gets complicated like this for example: `Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()` how should be the `KerberosClient` be named?
   
   In Spark and other components I've tried to mock/reimplement/modify/make `UserGroupInformation` testable w/o any success. I think we have the same situation here unless you have a clear doable suggestion.
   
   I think realistically we have the following possibilities for this case:
   * Mock `UserGroupInformation.getCurrentUser()` static function and we give back a mocked `UserGroupInformation` instance -> Here powermock runner with junit5 is simply not working and mockito is too old to mock static functions. All in all here only the mockito version upgrade could be a potential solution.
   * Use reflection to call `UserGroupInformation` hidden constructor -> here I have no idea what will happen, I mean how well initialized the instance will be + how to modify the instance behavior to give back something hardcoded
   * We don't write automated tests for places where `UserGroupInformation` is embedded
   * We introduce `KerberosClient` and we create functions like `currentUserRealUserOrElseCurrentUserHasKerberosCredentials()` from expressions like `Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()`
   
   Well, none of the proposals looks good but here in Flink I've not found the holy grail just like in other places.
   In Spark bullet point 3 has been implemented which is definitely debatable. The fact is that there with powermock bullet point 1 would be possible but nobody ever done that.
   
   There is a reason why [such](https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/sections/ugi.html) writings are created:
   `If there is one class guaranteed to strike fear into anyone with experience in Hadoop+Kerberos code it is UserGroupInformation, abbreviated to "UGI"`
   
   Let's hear your opinion.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1090235882

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847297674


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
+        // By default, a cancelled task is not automatically removed from the work queue until its
+        // delay elapses. We have to enable it manually.
+        renewalExecutor.setRemoveOnCancelPolicy(true);
+
+        startTGTRenewal();
+    }
+
+    @VisibleForTesting
+    boolean isRenewalPossible() throws IOException {
+        if (!StringUtils.isBlank(securityConfiguration.getKeytab())
+                && !StringUtils.isBlank(securityConfiguration.getPrincipal())) {
+            LOG.debug("Login from keytab is possible");
+            return true;
+        }
+        LOG.debug("Login from keytab is NOT possible");
+
+        if (securityConfiguration.useTicketCache()
+                && UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
+            LOG.debug("Login from ticket cache is possible");
+            return true;
+        }
+        LOG.debug("Login from ticket cache is NOT possible");
+
+        return false;
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            Runnable tgtRenewalTask =
+                    () -> {
+                        try {
+                            LOG.debug("Renewing TGT");
+                            currentUser.checkTGTAndReloginFromKeytab();
+                            LOG.debug("TGT renewed successfully");
+                        } catch (Exception e) {
+                            LOG.error("Error while renewing TGT", e);

Review Comment:
   Since we retry it makes sense so changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847507611


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   In the latest code `ComponentMainThreadExecutor` is null. When I know from where it should come from it will be filled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847507611


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   In the latest code `ComponentMainThreadExecutor` is null. When I know from where it should come from it will be filled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1097668495

   > Not sure what you mean here. TGT renewal and token obtain are totally different from many factors:
   >
   >    They obtain different things
   >    They does it with different frequency
   >
   > I would like ask you to elaborate on this.
   
   I've just noticed it's not implemented (yet) and my intuition was that the only difference is the frequency. If it does a different thing, that it's fine πŸ‘ 
   
   > The cluster entrypoint might be confusing for the user if hadoop is not present on the classpath, because we print the
   > missing hadoop warning on DEBUG level.
   
   In that case we could either:
   1) Consider it a misconfiguration and fail fast -> this would require changing the default value of `KERBEROS_FETCH_DELEGATION_TOKEN` to `false`
   2) We could print a warning with a nice explanation what it really means. This similar to what we were doing prior the introduction of the `HadoopDependency` class, but the main difference would be that this time it wouldn't be an ill-formatted stacktrace with no proper explanation.
   
   In both cases, `true` doesn't seem to be a reasonable default for `KERBEROS_FETCH_DELEGATION_TOKEN`. Any thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1091623695

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1090141243

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r872223118


##########
pom.xml:
##########
@@ -125,7 +125,7 @@ under the License.
 		<junit4.version>4.13.2</junit4.version>
 		<junit5.version>5.8.1</junit5.version>
 		<archunit.version>0.22.0</archunit.version>
-		<mockito.version>2.21.0</mockito.version>
+		<mockito.version>3.4.6</mockito.version>

Review Comment:
   This is just a POC and open for discussion. Please see [this](https://github.com/apache/flink/pull/19372#discussion_r849691193) comment where I've considered all of the possible solutions which I'm aware of. As a result static method mocking seems like the least painful solution if we want to mock `UGI`.
   
   All in all if we decide not to do that I can remove the extra commit which contains the Mockito version upgrade. As a consequence either we drop `testStartTGTRenewalShouldScheduleRenewal` or I would like to hear a different approach to mock `UGI`. If we decide to go on w/ this then I agree this must be a separate jira.
   
   I'm basically fine w/ either approaches though my personal opinion is that in this exceptional case static function mocking would be the least painful solution with super compact code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1128611123

   @dmvk Thanks for the suggestions, I've applied them all:
   * Added your latest suggestion as commit
   * Squashed it into a single commit
   * Added the following extra commit message: `Updated Mockito version to 3.4.6 in order to use static method mocking`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1134248484

   Merging, thanks for the contribution Gabor! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk merged pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk merged PR #19372:
URL: https://github.com/apache/flink/pull/19372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r854030684


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   @dmvk did you have a chance to consider the situation in-depth? I've my own suggestion which may or may not intersect your opinion. Namely if it's not too horror complex upgrading to mockito 3.4.0 (where static function mocking is introduced) and choosing bullet point 1 is my preference. Though no idea why mockito is super old, maybe there was no agreement to upgrade that?! If you have some insights please share.
   
   If that would be an overkill then I would vote on bullet point 3 because all the other options would add hard to maintain and brittle solutions.
   
   WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r872243204


##########
pom.xml:
##########
@@ -125,7 +125,7 @@ under the License.
 		<junit4.version>4.13.2</junit4.version>
 		<junit5.version>5.8.1</junit5.version>
 		<archunit.version>0.22.0</archunit.version>
-		<mockito.version>2.21.0</mockito.version>
+		<mockito.version>3.4.6</mockito.version>

Review Comment:
   I guess for this case it's justifiable as the Hadoop doesn't provide a reasonable interface you could interact with wrt testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mbalassi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
mbalassi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126658703

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r872305484


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,25 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService ioExecutor;

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849579153


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847258427


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I'd love to avoid adding a new executor here (in general we try to avoid a thread pollution where possible). We should be able to reuse the existing executors that are already available to the `ResourceManager`:
   - ComponentMainThreadExecutor (main thread for the ResourceManager endpoint; can be used for scheduling; can't have blocking runnables otherwise we'd disrupt the endpoint availability)
   - ioExecutor (for blocking io heavy calls)
   
   Basically the start method would become something along these lines:
   
   ```java
       public void start(ComponentMainThreadExecutor mainThreadExecutor, Executor ioExecutor) {
           final long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
           // The renewal future needs to be cancelled during close.
           final ScheduledFuture<?> renewalFuture =
                   mainThreadExecutor.schedule(
                           () ->
                                   ioExecutor.execute(
                                           () -> {
                                               // Todo...
                                           }),
                           tgtRenewalPeriod,
                           TimeUnit.MILLISECONDS);
       }
   ```
   
   WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1091146733

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1095061969

   > The cluster entrypoint might be confusing for the user if hadoop is not present on the classpath, because we print the missing hadoop warning on DEBUG level.
   
   What do you suggest as solution?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849229800


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   πŸ€” After little bit of digging, there is actually one more (scheduled) executor shared by the RPC services, that we could leverage. It should still be used in combination with the `ioExecutor`, because blocking there could potentially delay other critical things (eg. TM registration process).
   
   Both `ioExecutor` and `rpcService.getScheduledExecutor()` are already available in the `ClusterEntrypoint` so it should be straightforward to pass them via constructor, without having to create additional factories.
   
   Any thoughts?
   
   ```diff
   diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
   index 01c5f5f1c1c..8ee77e80d9c 100755
   --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
   +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
   @@ -395,7 +395,10 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                        configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
                                        && HadoopDependency.isHadoopCommonOnClasspath(
                                                getClass().getClassLoader())
   -                            ? new KerberosDelegationTokenManager(configuration)
   +                            ? new KerberosDelegationTokenManager(
   +                                    configuration,
   +                                    commonRpcService.getScheduledExecutor(),
   +                                    ioExecutor)
                                : new NoOpDelegationTokenManager();
                metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849590001


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed successfully");
+                                                } catch (Exception e) {
+                                                    LOG.error("Error while renewing TGT", e);
+                                                }
+                                            }),
+                            tgtRenewalPeriod,
+                            tgtRenewalPeriod,
+                            TimeUnit.MILLISECONDS);
+            LOG.debug("Credential renewal task started and reoccur in {} ms", tgtRenewalPeriod);

Review Comment:
   Changed to TGT all the places. In Kerberos there are hundreds of cloudy things and adding them to classes and/or functions doesn't help. My intention is to add a complete doc like [this](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/security/README.md) which includes every conceptual/architectural explanation. Here we update Ticket Granting Ticket and there is a description why needed:
   ```
               // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
               // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
               // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
               // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
               // the TGT does not need to be renewed yet.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r848157811


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   After I've had a deep look I've found the following obstacles which blocks us to use the mentioned executors:
   * If you mean `ResourceManager.getMainThreadExecutor` to use in the first bullet point then that's a `MainThreadExecutor` instance which doesn't support `scheduleAtFixedRate`, please see [here](https://github.com/apache/flink/blob/4034d3cd6d13e88e2e5ca101510bf333e94a53fa/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java#L537). We can implement that from scratch but touching core threading can be super dangerous, though I'm not against it to add such functionality.
   * If we would rely on `ResourceManager.getMainThreadExecutor` then we must modify the interface (`start` function) and we need to put implementation details into the interface (the 2 executors) which would frustrate me a bit. I think if we want to do dependency injection then the proper place would be the constructor which is not touching the interface. Here if we decide to modify the interface I can accept that but I would think the implementation would be less clean.
   * If we would add the new `scheduleAtFixedRate` functionality then `ResourceManager` usage is covered but in [MiniCluster](https://github.com/apache/flink/blob/f089a99ce73ad15531a4d1b72899d8367fab662a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L431) we must provide something too but there no such executor exists (at least I've not found anything) so we need to come up w/ a new executor anyway.
   
   All in all I've left the original code there for now until we make an agreement.
   
   I understand the direction not to pollute w/ threads but in this case the return of investment is low because in some cases new executor is needed but the code would be more complex.
   
   Would like to hear your voice on this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1097927982

   I've made all the changes, re-tested it on cluster and works. Please find the log in attach.
   [dt.log](https://github.com/apache/flink/files/8480947/dt.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r855434579


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   Seems like fixed all the issues and all the tests passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849691193


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   I've had a deeper look and let's summarize my findings:
   * No mocking framework allowed
   * `UserGroupInformation` class has no public constructor so instance creation is possible with reflection only which I'm pretty sure won't initialize the instance properly. As a result all places where `UserGroupInformation` is used need to be hacked around.
   * If we go to the `KerberosClient` direction we would see functions like `KerberosClient.hasCurrentUserKerberosCredentials()` instead of `UserGroupInformation.getCurrentUser().hasKerberosCredentials()` which is hacky but doable
   * But as soon as the condition gets complicated like this for example: `Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()` how should be the `KerberosClient` funtion be named?
   
   In Spark and other components I've tried to mock/reimplement/modify/make `UserGroupInformation` testable w/o any success. I think we have the same situation here unless you have a clear doable suggestion.
   
   I think realistically we have the following possibilities for this case:
   * Mock `UserGroupInformation.getCurrentUser()` static function and we give back a mocked `UserGroupInformation` instance -> Here powermock runner with junit5 is simply not working and mockito is too old to mock static functions. All in all here only the mockito version upgrade could be a potential solution.
   * Use reflection to call `UserGroupInformation` hidden constructor -> here I have no idea what will happen, I mean how well initialized the instance will be + how to modify the instance behavior to give back something hardcoded
   * We don't write automated tests for places where `UserGroupInformation` is embedded
   * We introduce `KerberosClient` and we create functions like `currentUserRealUserOrElseCurrentUserHasKerberosCredentials()` from expressions like `Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()`
   
   Well, none of the proposals looks good but here in Flink I've not found the holy grail just like in other places.
   In Spark bullet point 3 has been implemented which is definitely debatable. The fact is that there with powermock bullet point 1 would be possible but nobody ever done that.
   
   There is a reason why [such](https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/sections/ugi.html) writings are created:
   `If there is one class guaranteed to strike fear into anyone with experience in Hadoop+Kerberos code it is UserGroupInformation, abbreviated to "UGI"`
   
   Let's hear your opinion.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849578739


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -424,12 +424,21 @@ public void start() throws Exception {
 
                 heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
-                delegationTokenManager =
-                        configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
-                                        && HadoopDependency.isHadoopCommonOnClasspath(
-                                                getClass().getClassLoader())
-                                ? new KerberosDelegationTokenManager(configuration)
-                                : new NoOpDelegationTokenManager();
+                if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+                    if (HadoopDependency.isHadoopCommonOnClasspath(getClass().getClassLoader())) {
+                        delegationTokenManager =
+                                new KerberosDelegationTokenManager(
+                                        configuration,
+                                        commonRpcService.getScheduledExecutor(),
+                                        ioExecutor);
+                    } else {
+                        LOG.info(
+                                "Cannot use kerberos delegation token manager because Hadoop cannot be found in the Classpath.");
+                        delegationTokenManager = new NoOpDelegationTokenManager();
+                    }
+                } else {
+                    delegationTokenManager = new NoOpDelegationTokenManager();
+                }

Review Comment:
   `KerberosDelegationTokenManagerFactory` added.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;
+
+    private ScheduledFuture<?> tgtRenewalFuture;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1097820808

   > As long as this would be discussed on the mailing list, it should IMO be ok-ish as it's a change to the configuration and doesn't break any APIs.
   
   Good to know how customs work here.
   
   > I'm not sure that I correctly interpret this sentence. Are you suggesting that we should use an info level instead of a warning for the message? If yes, I'm OK with that πŸ‘
   
   Yeah, I meant that exactly. As mentioned making the default change would require discussion I would vote on the info message (though I think the same as you, the default is kinda' weird). We can re-open the default value question later when everything works.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126687852

   I've rebased the whole change to the top of the latest master. Maybe this helps since other PRs are not suffering from this:
   ```
   --2022-05-14 08:05:31--  http://security.ubuntu.com/ubuntu/pool/main/o/openssl1.0/libssl1.0.0_1.0.2n-1ubuntu5.8_amd64.deb
   Resolving security.ubuntu.com (security.ubuntu.com)... 185.125.190.36, 91.189.91.38, 185.125.190.39, ...
   Connecting to security.ubuntu.com (security.ubuntu.com)|185.125.190.36|:80... connected.
   HTTP request sent, awaiting response... 404 Not Found
   2022-05-14 08:05:31 ERROR 404: Not Found.
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r872215096


##########
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##########
@@ -97,13 +98,14 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /** Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+@PrepareForTest(FlinkKinesisConsumer.class)
 public class FlinkKinesisConsumerTest extends TestLogger {

Review Comment:
   This is Mockito version change related which we may or may not do in a separate jira.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126717741

   Finally w/ all the modifications tests passed :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1128815905

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1090337233

   Please be aware that I've started to test it on k8s but not yet finished: https://gist.github.com/gaborgsomogyi/ac4f71ead8494da2f5c35265bcb1e885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847258427


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I'd love to avoid adding a new executor here (in general we try to avoid a thread pollution where possible). We should be able to reuse the existing executors that are already available to the `ResourceManager`:
   - ComponentMainThreadExecutor (main thread for the ResourceManager endpoint; can be used for scheduling; can't have blocking runnables otherwise we'd disrupt the endpoint availability)
   - ioExecutor (for blocking io heavy calls)
   
   Basically the start method would become something along these lines:
   
   ```java
           final long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
           // The renewal future needs to be cancelled during close.
           final ScheduledFuture<?> renewalFuture =
                   mainThreadExecutor.schedule(
                           () ->
                                   ioExecutor.execute(
                                           () -> {
                                               // Todo...
                                           }),
                           tgtRenewalPeriod,
                           TimeUnit.MILLISECONDS);
                           ```
   
   WDYT?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =

Review Comment:
   nit: Please mark variables as `final` where possible.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -79,4 +93,48 @@ public void testAllProvidersLoaded() {
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
     }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackFalseByDefault() throws IOException {
+        UserGroupInformation ugi = PowerMockito.mock(UserGroupInformation.class);
+        PowerMockito.mockStatic(UserGroupInformation.class);
+        when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+
+        ExceptionThrowingDelegationTokenProvider.enabled = false;
+        Configuration configuration = new Configuration();
+        KerberosDelegationTokenManager delegationTokenManager =
+                new KerberosDelegationTokenManager(configuration);
+
+        assertFalse(delegationTokenManager.isRenewalPossible());
+    }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackTrueWhenKeytab() throws IOException {

Review Comment:
   We should use the junit5 resources for temporary files. Eg.
   
   ```suggestion
       public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path tmpDir) throws IOException {
           final Path file = Files.createFile(tmpDir.resolve("test.keytab"));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;

Review Comment:
   We shouldn't be adding any new tests that are using `Mockito` for mocking. Here is a more detailed reasoning behind the decision: https://docs.google.com/presentation/d/1fZlTjOJscwmzYadPGl23aui6zopl94Mn5smG-rB0qT8/edit#slide=id.g2fa61f7d00_0_99
   
   Since the UserGroupInformation needs mocking of static mtehods in the current form, you'll most likely need to hide it behind a new interface, that could be replaced for testing.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
+        // By default, a cancelled task is not automatically removed from the work queue until its
+        // delay elapses. We have to enable it manually.
+        renewalExecutor.setRemoveOnCancelPolicy(true);
+
+        startTGTRenewal();
+    }
+
+    @VisibleForTesting
+    boolean isRenewalPossible() throws IOException {
+        if (!StringUtils.isBlank(securityConfiguration.getKeytab())
+                && !StringUtils.isBlank(securityConfiguration.getPrincipal())) {
+            LOG.debug("Login from keytab is possible");
+            return true;
+        }
+        LOG.debug("Login from keytab is NOT possible");
+
+        if (securityConfiguration.useTicketCache()
+                && UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
+            LOG.debug("Login from ticket cache is possible");
+            return true;
+        }
+        LOG.debug("Login from ticket cache is NOT possible");
+
+        return false;
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            Runnable tgtRenewalTask =
+                    () -> {
+                        try {
+                            LOG.debug("Renewing TGT");
+                            currentUser.checkTGTAndReloginFromKeytab();
+                            LOG.debug("TGT renewed successfully");
+                        } catch (Exception e) {
+                            LOG.error("Error while renewing TGT", e);

Review Comment:
   warning?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;

Review Comment:
   -> junit5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847315707


##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847413385


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I'm fine w/ using the already existing executors but I don't see any `ComponentMainThreadExecutor` in `ResourceManager`. Can you point where can I find it to pass to DTM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847504606


##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;

Review Comment:
   I basically not agree that mocking is bad in general but this case is special.
   Junit5 is not supporting powermock so no other possibility than function override.
   Yeah, there is a possibility to add an interface but I felt it would be an overkill.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1091638288

   Just to mark the original source of this code it's basically coming from Spark with some modifications: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1093068781

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r854939573


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   In the meantime I've had a deeper look at the mock framework versions and [here](https://gist.github.com/gaborgsomogyi/4e8b120cbebde6d2a6903e5fcccbbaff) is the extract.
   
   I've added a mockito version upgrade based unit test which we may or may not agree on.
   This single test works but not sure about all other tests, waiting on jenkins to show it to us.
   If we say this is not the direction or all other tests are not working then I can roll it back easily.
   As a general note from my side. Even if we would choose this solution I think it would be good to split it up to 3 PRs:
   * This PR w/o startTGTRenewal test
   * Upgrade mockito version
   * Add startTGTRenewal test
   
   If this not works or you disagree plz suggest a way.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1125904196

   I'm fine-ish with using Mockito for UGI as that's a very unreasonable interface to interact with (and it's also pretty isolated to the DTM component; my feeling that this doesn't need spread to other tests in the followup PRs).
   
   Can you please create a JIRA for the dependency upgrade? (it doesn't need a separate PR, it should be just linked with the appropriate commit that bumps the mockito version).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r848157811


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   After I've had a deep look I've found the following obstacles which blocks us to use the mentioned executors:
   * If you mean `ResourceManager.getMainThreadExecutor` to use in the first bullet point then that's a `MainThreadExecutor` instance which doesn't support `scheduleAtFixedRate`, please see [here](https://github.com/apache/flink/blob/4034d3cd6d13e88e2e5ca101510bf333e94a53fa/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java#L537). We can implement that from scratch but touching core threading can be super dangerous, though I'm not against it to add such functionality.
   * If we would rely on `ResourceManager.getMainThreadExecutor` then we must modify the interface (`start` function) and we need to put implementation details into the interface (the 2 executors) which would frustrate me a bit. I think if we want to do dependency injection then the proper place would be the constructor which is not touching the interface. Here if we decide to modify the interface I can accept that but I would think the implementation would be less clean.
   * If we would add the new `scheduleAtFixedRate` functionality then `ResourceManager` usage is covered but in [MiniCluster](https://github.com/apache/flink/blob/f089a99ce73ad15531a4d1b72899d8367fab662a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L431) we must provide something too but there no such executor exists (at least I've not found anything) so we need to come up w/ a new executor anyway.
   
   All in all I've left the original code there for now until we make an agreement.
   Would like to hear your voice on this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1090006663

   cc @mbalassi @dmvk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849583201


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;

Review Comment:
   It will be needed in later PRs but for now it can be dropped.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed successfully");
+                                                } catch (Exception e) {
+                                                    LOG.error("Error while renewing TGT", e);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849591137


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   I've started to have a look...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r874491169


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java:
##########
@@ -471,8 +472,16 @@ private void verifyNoFileIsRegisteredToDeleteOnExitHook() {
             Class<?> clazz = Class.forName("java.io.DeleteOnExitHook");
             Field field = clazz.getDeclaredField("files");
             field.setAccessible(true);
-            LinkedHashSet files = (LinkedHashSet) field.get(null);
-            assertTrue(files.isEmpty());
+            LinkedHashSet<String> files = (LinkedHashSet<String>) field.get(null);
+            boolean fileFound = false;
+            // Mockito adds mockitoboot*.jar file by default so put it to whitelist

Review Comment:
   D&I πŸ™ˆ 
   
   ```suggestion
               // Mockito automatically registers mockitoboot*.jar for on-exit removal. Verify that there are no other files registered.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126192259

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r855078086


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   The first try failed w/ mockito 3.4.0 because I ran into the [following](https://github.com/mockito/mockito/commit/12ba5936b736a2886220f9ae4a6492558dba4e14) issue.
   Upgraded to the latest patch version of mockito (3.4.6) to have all such fixes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1125930126

   As suggested I've created FLINK-27605.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1128532813

   Just a minor thing, for linking the FLINK-27605, I was referring to having a separate commit with the mockito upgrade.
   
   Basically the PR would have two commits:
   ```
   [FLINK-26043][runtime][security] Add periodic kerberos relogin to Ker…
   [FLINK-27605] .... upgrade mockito
   ```
   
   Sorry for the confusion πŸ™ˆ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r872200266


##########
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##########
@@ -97,13 +98,14 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /** Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+@PrepareForTest(FlinkKinesisConsumer.class)
 public class FlinkKinesisConsumerTest extends TestLogger {

Review Comment:
   How does this `Kinesis` change relate to the Kerberos effort?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,25 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService ioExecutor;

Review Comment:
   `@Nullable`



##########
pom.xml:
##########
@@ -125,7 +125,7 @@ under the License.
 		<junit4.version>4.13.2</junit4.version>
 		<junit5.version>5.8.1</junit5.version>
 		<archunit.version>0.22.0</archunit.version>
-		<mockito.version>2.21.0</mockito.version>
+		<mockito.version>3.4.6</mockito.version>

Review Comment:
   Version upgrades should have the separate issue.
   
   cc @zentol 



##########
pom.xml:
##########
@@ -223,6 +223,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-inline</artifactId>
+			<version>${mockito.version}</version>
+			<type>jar</type>
+			<scope>test</scope>
+		</dependency>

Review Comment:
   We try to push against using `Mockito / ...` based mocks as these come with a high maintenance cost. I know there are still some parts of code that use this approach, but it would be great to avoid it for the new code.
   
   Adding new extended capabilities to `Mockito` seems to be heading in the opposite direction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126051696

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1097751263

   > Correct me if I'm wrong but AFAIK Flink must avoid breaking changes between minor releases.
   > Having true as default for KERBEROS_FETCH_DELEGATION_TOKEN may or may not be good but is a must if we want to
   > keep compatibility. All in all I'm open to change the default but from user perspective it would require minor migration.
   
   As long as this would be discussed on the mailing list, it should IMO be ok-ish as it's a change to the configuration and doesn't break any APIs.
   
   All-in-all if we just go ahead with 2), it should be fine not doing that as users are already use to the behavior (even though it's bit weird in my opinion).
   
   > Just a side not all example are using info level messages so I think it would be good to keep this pattern if you don't have objection.
   
   I'm not sure that I correctly interpret this sentence. Are you suggesting that we should use an info level instead of a warning for the message? If yes, I'm OK with that πŸ‘


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1097737082

   Correct me if I'm wrong but AFAIK Flink must avoid breaking changes between minor releases.
   Having `true ` as default for `KERBEROS_FETCH_DELEGATION_TOKEN` may or may not be good but is a must if we want to keep compatibility. All in all I'm open to change the default but from user perspective it would require minor migration.
   
   Considering this and the fact that there is already a pattern in Flink for bullet point 2 I would vote on that. 
   We can print out a meaningful message what does that actually mean. Just a side not all example are using info level messages so I think it would be good to keep this pattern if you don't have objection.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849363213


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I've tested it on cluster and works like charm so closing this discussion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849445045


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -424,12 +424,21 @@ public void start() throws Exception {
 
                 heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
-                delegationTokenManager =
-                        configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
-                                        && HadoopDependency.isHadoopCommonOnClasspath(
-                                                getClass().getClassLoader())
-                                ? new KerberosDelegationTokenManager(configuration)
-                                : new NoOpDelegationTokenManager();
+                if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+                    if (HadoopDependency.isHadoopCommonOnClasspath(getClass().getClassLoader())) {
+                        delegationTokenManager =
+                                new KerberosDelegationTokenManager(
+                                        configuration,
+                                        commonRpcService.getScheduledExecutor(),
+                                        ioExecutor);
+                    } else {
+                        LOG.info(
+                                "Cannot use kerberos delegation token manager because Hadoop cannot be found in the Classpath.");
+                        delegationTokenManager = new NoOpDelegationTokenManager();
+                    }
+                } else {
+                    delegationTokenManager = new NoOpDelegationTokenManager();
+                }

Review Comment:
   Can you please hide this behind a static method factory (eg. in `DelegationTokenManagerUtils`) so it can be reused by both `MiniCluster` and `ClusterEntrypoint`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;
+
+    private ScheduledFuture<?> tgtRenewalFuture;

Review Comment:
   ```suggestion
       @Nullable
       private ScheduledFuture<?> tgtRenewalFuture;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed successfully");
+                                                } catch (Exception e) {
+                                                    LOG.error("Error while renewing TGT", e);

Review Comment:
   warn



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;
+
+    private ScheduledFuture<?> tgtRenewalFuture;
+
+    public KerberosDelegationTokenManager(
+            Configuration configuration,
+            @Nullable ScheduledExecutor scheduledExecutor,
+            @Nullable ExecutorService executorService) {

Review Comment:
   It feels off that this could be null just because of one usage of `DelegationTokenManager#obtainDelegationTokens` in  the `YarnClusterDescriptor`. Could it be the case that this method doesn't really belong into this interface?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;

Review Comment:
   this doesn't seem to need an instance variable



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;

Review Comment:
   `ioExecutor` would be more explicit as it's commonly used throughout the JM components.
   ```suggestion
       private final ExecutorService ioExecutor;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   Can you please write a test case for the periodic renewal? (`ManuallyTriggeredScheduledExecutorService` is usually pretty helpful for this)
   
   The main reason I'm bringing this up that this still uses Hadoop's UGI internals, which makes this mostly untestable. My feeling is that `KerberosRenewalPossibleProvider` should become something that completely hides the Hadoop internals from DTM (eg. `KerberosClient` with two implementations: `HadoopKerberosClient` and `TestingKerberosClient` -> see eg. `TestingDeclarativeSlotPool` for how the "testing" implementations in Flink are usually created).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed successfully");
+                                                } catch (Exception e) {
+                                                    LOG.error("Error while renewing TGT", e);
+                                                }
+                                            }),
+                            tgtRenewalPeriod,
+                            tgtRenewalPeriod,
+                            TimeUnit.MILLISECONDS);
+            LOG.debug("Credential renewal task started and reoccur in {} ms", tgtRenewalPeriod);

Review Comment:
   There is some inconsistency in the log messages, there is a mix of references to TGT and credentials, these should be unified. Also it would be great to add a reference to the class javadoc about what the TGT really stands for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847414464


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =

Review Comment:
   Since we plan to use existing executors this code will be deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1095077528

   > Should the obtainDelegationTokens(Credentials credentials) be implemented? Maybe it could be a shared code path for the renewal (the one time renewal that is scheduled)?
   
   Not sure what you mean here. TGT renewal and token obtain are totally different from many factors:
   * They obtain different things
   * They does it with different frequency
   
   I would like ask you to elaborate on this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1089968717

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fdd12a3471523db3821f24126421e362b67c52ff",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fdd12a3471523db3821f24126421e362b67c52ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fdd12a3471523db3821f24126421e362b67c52ff UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1125888070

   > My only concern is around changes to the Mockito infrastructure (should be a separate story) and addition of the new Mockito based tests.
   > Would it be possible to avoid that?
   
   I'm basically fine w/ any approaches though my personal opinion is that in this exceptional case static function mocking would be the least painful solution with super compact code. Please see the details [here](https://github.com/apache/flink/pull/19372#discussion_r872223118). Waiting on opinions to make agreement and we're going to go that direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1125967453

   I've done the following steps and I think every suggestion is added:
   * Squashed all the commits
   * Added the last missing `@Nullable`s
   * Changed the PR description to reflect Mockito related changes
   * Linked this PR to FLINK-27605
   
   Please let me know if I've missed anything.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126125435

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1126477747

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847376435


##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -79,4 +93,48 @@ public void testAllProvidersLoaded() {
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
     }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackFalseByDefault() throws IOException {
+        UserGroupInformation ugi = PowerMockito.mock(UserGroupInformation.class);
+        PowerMockito.mockStatic(UserGroupInformation.class);
+        when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+
+        ExceptionThrowingDelegationTokenProvider.enabled = false;
+        Configuration configuration = new Configuration();
+        KerberosDelegationTokenManager delegationTokenManager =
+                new KerberosDelegationTokenManager(configuration);
+
+        assertFalse(delegationTokenManager.isRenewalPossible());
+    }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackTrueWhenKeytab() throws IOException {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1091893147

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1091623030

   I've just finished the testing on minikube and here is the log which proves that renewal happens successfully. Kerberos logging is also enabled to see the details.
   [dt.log](https://github.com/apache/flink/files/8442406/dt.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849249180


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   That fully makes sense, `commonRpcService` also available in the `MiniCluster` area. Making the changes right now...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #19372:
URL: https://github.com/apache/flink/pull/19372#issuecomment-1134321938

   @dmvk thanks for making the PR better and taking care!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org