You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/03/29 03:01:05 UTC

[incubator-uniffle] branch master updated: [#772] fix(kerberos): cache proxy user ugi to avoid memory leak (#773)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 33e0d23e [#772] fix(kerberos): cache proxy user ugi to avoid memory leak (#773)
33e0d23e is described below

commit 33e0d23ed45740cb0619444cc4b9b2dd7d33cf6d
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Wed Mar 29 11:00:58 2023 +0800

    [#772] fix(kerberos): cache proxy user ugi to avoid memory leak (#773)
    
    ### What changes were proposed in this pull request?
    
    1. To avoid memory leak by caching of proxy user UGI.
    
    ### Why are the changes needed?
    
    Fix: #772
    
    The Hadoop filesystem instance will be created too many time in cache,
    which will cause the shuffle server memory leak.
    
    As we know, the filesystem cache's key is built by the scheme态authority and UGI.
    The scheme and authority are not changed every time. But for UGI, if we invoke the
    createProxyUser, it will always create a new one, that means the every invoking `Filesystem.get()`,
    it will be cached due to different key.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    1. Existing UTs
    2. Added tests
---
 .../common/security/HadoopSecurityContext.java     | 24 +++++++++++++++++++++-
 .../common/security/HadoopSecurityContextTest.java | 18 ++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java
index cb1da3c7..d7ddd275 100644
--- a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java
+++ b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java
@@ -19,16 +19,19 @@ package org.apache.uniffle.common.security;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
 public class HadoopSecurityContext implements SecurityContext {
@@ -38,6 +41,12 @@ public class HadoopSecurityContext implements SecurityContext {
   private UserGroupInformation loginUgi;
   private ScheduledExecutorService refreshScheduledExecutor;
 
+  // The purpose of the proxy user ugi cache is to prevent the creation of
+  // multiple cache keys for the same user, scheme, and authority in the Hadoop filesystem.
+  // Without this cache, large amounts of unnecessary filesystem instances could be stored in memory,
+  // leading to potential memory leaks. For more information on this issue, refer to #706.
+  private Map<String, UserGroupInformation> proxyUserUgiPool;
+
   public HadoopSecurityContext(
       String krb5ConfPath,
       String keytabFile,
@@ -72,6 +81,7 @@ public class HadoopSecurityContext implements SecurityContext {
         refreshIntervalSec,
         refreshIntervalSec,
         TimeUnit.SECONDS);
+    proxyUserUgiPool = JavaUtils.newConcurrentMap();
   }
 
   private void authRefresh() {
@@ -91,8 +101,10 @@ public class HadoopSecurityContext implements SecurityContext {
 
     // Run with the proxy user.
     if (!user.equals(loginUgi.getShortUserName())) {
+      UserGroupInformation proxyUserUgi =
+          proxyUserUgiPool.computeIfAbsent(user, x -> UserGroupInformation.createProxyUser(x, loginUgi));
       return executeWithUgiWrapper(
-          UserGroupInformation.createProxyUser(user, loginUgi),
+          proxyUserUgi,
           securedCallable
       );
     }
@@ -110,10 +122,20 @@ public class HadoopSecurityContext implements SecurityContext {
     return ugi.doAs((PrivilegedExceptionAction<T>) callable::call);
   }
 
+  // Only for tests
+  @VisibleForTesting
+  Map<String, UserGroupInformation> getProxyUserUgiPool() {
+    return proxyUserUgiPool;
+  }
+
   @Override
   public void close() throws IOException {
     if (refreshScheduledExecutor != null) {
       refreshScheduledExecutor.shutdown();
     }
+    if (proxyUserUgiPool != null) {
+      proxyUserUgiPool.clear();
+      proxyUserUgiPool = null;
+    }
   }
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
index 31866ec9..f4f79f76 100644
--- a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
@@ -18,10 +18,13 @@
 package org.apache.uniffle.common.security;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -66,13 +69,28 @@ public class HadoopSecurityContextTest extends KerberizedHdfsBase {
 
       // case3: run by the proxy user
       Path pathWithAlexUser = new Path("/alex/HadoopSecurityContextTest");
+      AtomicReference<UserGroupInformation> ugi1 = new AtomicReference<>();
       context.runSecured("alex", (Callable<Void>) () -> {
+        ugi1.set(UserGroupInformation.getCurrentUser());
         kerberizedHdfs.getFileSystem().mkdirs(pathWithAlexUser);
         return null;
       });
       fileStatus = kerberizedHdfs.getFileSystem().getFileStatus(pathWithAlexUser);
       assertEquals("alex", fileStatus.getOwner());
 
+      // case4: run by the proxy user again, it will always return the same
+      // ugi and filesystem instance.
+      AtomicReference<UserGroupInformation> ugi2 = new AtomicReference<>();
+      context.runSecured("alex", (Callable<Void>) () -> {
+        ugi2.set(UserGroupInformation.getCurrentUser());
+        return null;
+      });
+      assertTrue(ugi1.get() == ugi2.get());
+      assertTrue(ugi1.get() == context.getProxyUserUgiPool().get("alex"));
+
+      FileSystem fileSystem1 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf()));
+      FileSystem fileSystem2 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf()));
+      assertTrue(fileSystem1 == fileSystem2);
     }
   }