You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 12:43:55 UTC

[GitHub] [flink] dmvk commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager

dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847258427


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I'd love to avoid adding a new executor here (in general we try to avoid a thread pollution where possible). We should be able to reuse the existing executors that are already available to the `ResourceManager`:
   - ComponentMainThreadExecutor (main thread for the ResourceManager endpoint; can be used for scheduling; can't have blocking runnables otherwise we'd disrupt the endpoint availability)
   - ioExecutor (for blocking io heavy calls)
   
   Basically the start method would become something along these lines:
   
   ```java
           final long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
           // The renewal future needs to be cancelled during close.
           final ScheduledFuture<?> renewalFuture =
                   mainThreadExecutor.schedule(
                           () ->
                                   ioExecutor.execute(
                                           () -> {
                                               // Todo...
                                           }),
                           tgtRenewalPeriod,
                           TimeUnit.MILLISECONDS);
                           ```
   
   WDYT?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =

Review Comment:
   nit: Please mark variables as `final` where possible.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -79,4 +93,48 @@ public void testAllProvidersLoaded() {
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
     }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackFalseByDefault() throws IOException {
+        UserGroupInformation ugi = PowerMockito.mock(UserGroupInformation.class);
+        PowerMockito.mockStatic(UserGroupInformation.class);
+        when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+
+        ExceptionThrowingDelegationTokenProvider.enabled = false;
+        Configuration configuration = new Configuration();
+        KerberosDelegationTokenManager delegationTokenManager =
+                new KerberosDelegationTokenManager(configuration);
+
+        assertFalse(delegationTokenManager.isRenewalPossible());
+    }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackTrueWhenKeytab() throws IOException {

Review Comment:
   We should use the junit5 resources for temporary files. Eg.
   
   ```suggestion
       public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path tmpDir) throws IOException {
           final Path file = Files.createFile(tmpDir.resolve("test.keytab"));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;

Review Comment:
   We shouldn't be adding any new tests that are using `Mockito` for mocking. Here is a more detailed reasoning behind the decision: https://docs.google.com/presentation/d/1fZlTjOJscwmzYadPGl23aui6zopl94Mn5smG-rB0qT8/edit#slide=id.g2fa61f7d00_0_99
   
   Since the UserGroupInformation needs mocking of static mtehods in the current form, you'll most likely need to hide it behind a new interface, that could be replaced for testing.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
+        // By default, a cancelled task is not automatically removed from the work queue until its
+        // delay elapses. We have to enable it manually.
+        renewalExecutor.setRemoveOnCancelPolicy(true);
+
+        startTGTRenewal();
+    }
+
+    @VisibleForTesting
+    boolean isRenewalPossible() throws IOException {
+        if (!StringUtils.isBlank(securityConfiguration.getKeytab())
+                && !StringUtils.isBlank(securityConfiguration.getPrincipal())) {
+            LOG.debug("Login from keytab is possible");
+            return true;
+        }
+        LOG.debug("Login from keytab is NOT possible");
+
+        if (securityConfiguration.useTicketCache()
+                && UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
+            LOG.debug("Login from ticket cache is possible");
+            return true;
+        }
+        LOG.debug("Login from ticket cache is NOT possible");
+
+        return false;
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop
+            // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays logged in regardless of
+            // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            Runnable tgtRenewalTask =
+                    () -> {
+                        try {
+                            LOG.debug("Renewing TGT");
+                            currentUser.checkTGTAndReloginFromKeytab();
+                            LOG.debug("TGT renewed successfully");
+                        } catch (Exception e) {
+                            LOG.error("Error while renewing TGT", e);

Review Comment:
   warning?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;

Review Comment:
   -> junit5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org