You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/29 01:05:07 UTC

[doris] branch master updated: [Enhancement](multi-catalog) try to reuse existed ugi. (#21274)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 884c908e25 [Enhancement](multi-catalog) try to reuse existed ugi. (#21274)
884c908e25 is described below

commit 884c908e25be74af734eb6b370708fb4524e177e
Author: Xiangyu Wang <du...@gmail.com>
AuthorDate: Thu Jun 29 09:04:59 2023 +0800

    [Enhancement](multi-catalog) try to reuse existed ugi. (#21274)
    
    Try to reuse an existed ugi at DFSFileSystem, otherwise if we query a more then ten-thousands partitons hms table, we will do more than ten-thousands login operations, each login operation will cost hundreds of ms from my test.
    Co-authored-by: 王翔宇 <wa...@360shuke.com>
---
 .../org/apache/doris/catalog/HdfsResource.java     |  1 +
 .../apache/doris/fs/remote/RemoteFileSystem.java   |  3 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  | 70 ++++++++++++++--------
 3 files changed, 49 insertions(+), 25 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index d812d8ec53..cdfb169590 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -48,6 +48,7 @@ public class HdfsResource extends Resource {
     public static String HADOOP_USER_NAME = "hadoop.username";
     public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
     public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+    public static String HADOOP_KERBEROS_AUTHORIZATION = "hadoop.security.authorization";
     public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
     public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
     public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index 7d87993733..ffe63f20ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -31,7 +31,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 public abstract class RemoteFileSystem extends PersistentFileSystem {
-    protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
+    // this field will be visited by multi-threads, better use volatile qualifier
+    protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
 
     public RemoteFileSystem(String name, StorageBackend.StorageType type) {
         super(name, type);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 9f72595ad8..ce297ce920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -29,6 +29,7 @@ import org.apache.doris.fs.operations.OpParams;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +55,6 @@ import java.security.PrivilegedAction;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class DFSFileSystem extends RemoteFileSystem {
 
@@ -82,42 +82,61 @@ public class DFSFileSystem extends RemoteFileSystem {
             conf.set(propEntry.getKey(), propEntry.getValue());
         }
 
-        UserGroupInformation ugi = getUgi(conf);
-        AtomicReference<Exception> exception = new AtomicReference<>();
+        boolean hasRelogin = false;
+        UserGroupInformation ugi;
+        try {
+            // try use current ugi first to avoid relogin
+            // because it may be a time-consuming task
+            ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+            LOG.warn("An IOException occurs when invoke "
+                    + "UserGroupInformation.getCurrentUser(), relogin immediately.", e);
+            ugi = doLogin(conf);
+            hasRelogin = true;
+        }
 
-        dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
+        do {
             try {
-                String username = properties.get(HdfsResource.HADOOP_USER_NAME);
-                if (username == null) {
-                    return FileSystem.get(new Path(remotePath).toUri(), conf);
-                } else {
-                    return FileSystem.get(new Path(remotePath).toUri(), conf, username);
+                dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
+                    try {
+                        String username = properties.get(HdfsResource.HADOOP_USER_NAME);
+                        return username == null
+                                    ? FileSystem.get(new Path(remotePath).toUri(), conf)
+                                    : FileSystem.get(new Path(remotePath).toUri(), conf, username);
+                    } catch (IOException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+                LOG.debug("Reuse current ugi for dfs, remote path: {}", remotePath);
+                break;
+            } catch (SecurityException e) {
+                LOG.warn("A SecurityException occurs when invoke ugi.doAs(), "
+                            + "relogin and retry immediately.", e);
+                if (hasRelogin) {
+                    throw new UserException(e);
                 }
-            } catch (Exception e) {
-                exception.set(e);
-                return null;
+                ugi = doLogin(conf);
+                hasRelogin = true;
             }
-        });
+        } while (true);
 
-        if (dfsFileSystem == null) {
-            LOG.error("errors while connect to " + remotePath, exception.get());
-            throw new UserException("errors while connect to " + remotePath, exception.get());
-        }
+        Preconditions.checkNotNull(dfsFileSystem);
         operations = new HDFSFileOperations(dfsFileSystem);
         return dfsFileSystem;
     }
 
-    private UserGroupInformation getUgi(Configuration conf) throws UserException {
-        String authentication = conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null);
-        if (AuthType.KERBEROS.getDesc().equals(authentication)) {
-            conf.set("hadoop.security.authorization", "true");
-            UserGroupInformation.setConfiguration(conf);
+    private UserGroupInformation doLogin(Configuration conf) throws UserException {
+        if (AuthType.KERBEROS.getDesc().equals(
+                    conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) {
+            conf.set(HdfsResource.HADOOP_KERBEROS_AUTHORIZATION, "true");
             String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
             String keytab = conf.get(HdfsResource.HADOOP_KERBEROS_KEYTAB);
+
+            UserGroupInformation.setConfiguration(conf);
             try {
                 UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
                 UserGroupInformation.setLoginUser(ugi);
-                LOG.info("kerberos authentication successful");
+                LOG.info("Login by kerberos authentication with principal: {}", principal);
                 return ugi;
             } catch (IOException e) {
                 throw new UserException(e);
@@ -128,7 +147,10 @@ public class DFSFileSystem extends RemoteFileSystem {
                 hadoopUserName = "hadoop";
                 LOG.debug(HdfsResource.HADOOP_USER_NAME + " is unset, use default user: hadoop");
             }
-            return UserGroupInformation.createRemoteUser(hadoopUserName);
+            UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
+            UserGroupInformation.setLoginUser(ugi);
+            LOG.info("Login by proxy user, hadoop.username: {}", hadoopUserName);
+            return ugi;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org