You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/29 01:05:07 UTC
[doris] branch master updated: [Enhancement](multi-catalog) try to reuse existed ugi. (#21274)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 884c908e25 [Enhancement](multi-catalog) try to reuse existed ugi. (#21274)
884c908e25 is described below
commit 884c908e25be74af734eb6b370708fb4524e177e
Author: Xiangyu Wang <du...@gmail.com>
AuthorDate: Thu Jun 29 09:04:59 2023 +0800
[Enhancement](multi-catalog) try to reuse existed ugi. (#21274)
Try to reuse an existed ugi at DFSFileSystem, otherwise if we query a more then ten-thousands partitons hms table, we will do more than ten-thousands login operations, each login operation will cost hundreds of ms from my test.
Co-authored-by: 王翔宇 <wa...@360shuke.com>
---
.../org/apache/doris/catalog/HdfsResource.java | 1 +
.../apache/doris/fs/remote/RemoteFileSystem.java | 3 +-
.../apache/doris/fs/remote/dfs/DFSFileSystem.java | 70 ++++++++++++++--------
3 files changed, 49 insertions(+), 25 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index d812d8ec53..cdfb169590 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -48,6 +48,7 @@ public class HdfsResource extends Resource {
public static String HADOOP_USER_NAME = "hadoop.username";
public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+ public static String HADOOP_KERBEROS_AUTHORIZATION = "hadoop.security.authorization";
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index 7d87993733..ffe63f20ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -31,7 +31,8 @@ import java.util.ArrayList;
import java.util.List;
public abstract class RemoteFileSystem extends PersistentFileSystem {
- protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
+ // this field will be visited by multi-threads, better use volatile qualifier
+ protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
public RemoteFileSystem(String name, StorageBackend.StorageType type) {
super(name, type);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 9f72595ad8..ce297ce920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -29,6 +29,7 @@ import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +55,6 @@ import java.security.PrivilegedAction;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
public class DFSFileSystem extends RemoteFileSystem {
@@ -82,42 +82,61 @@ public class DFSFileSystem extends RemoteFileSystem {
conf.set(propEntry.getKey(), propEntry.getValue());
}
- UserGroupInformation ugi = getUgi(conf);
- AtomicReference<Exception> exception = new AtomicReference<>();
+ boolean hasRelogin = false;
+ UserGroupInformation ugi;
+ try {
+ // try use current ugi first to avoid relogin
+ // because it may be a time-consuming task
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ LOG.warn("An IOException occurs when invoke "
+ + "UserGroupInformation.getCurrentUser(), relogin immediately.", e);
+ ugi = doLogin(conf);
+ hasRelogin = true;
+ }
- dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
+ do {
try {
- String username = properties.get(HdfsResource.HADOOP_USER_NAME);
- if (username == null) {
- return FileSystem.get(new Path(remotePath).toUri(), conf);
- } else {
- return FileSystem.get(new Path(remotePath).toUri(), conf, username);
+ dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
+ try {
+ String username = properties.get(HdfsResource.HADOOP_USER_NAME);
+ return username == null
+ ? FileSystem.get(new Path(remotePath).toUri(), conf)
+ : FileSystem.get(new Path(remotePath).toUri(), conf, username);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ LOG.debug("Reuse current ugi for dfs, remote path: {}", remotePath);
+ break;
+ } catch (SecurityException e) {
+ LOG.warn("A SecurityException occurs when invoke ugi.doAs(), "
+ + "relogin and retry immediately.", e);
+ if (hasRelogin) {
+ throw new UserException(e);
}
- } catch (Exception e) {
- exception.set(e);
- return null;
+ ugi = doLogin(conf);
+ hasRelogin = true;
}
- });
+ } while (true);
- if (dfsFileSystem == null) {
- LOG.error("errors while connect to " + remotePath, exception.get());
- throw new UserException("errors while connect to " + remotePath, exception.get());
- }
+ Preconditions.checkNotNull(dfsFileSystem);
operations = new HDFSFileOperations(dfsFileSystem);
return dfsFileSystem;
}
- private UserGroupInformation getUgi(Configuration conf) throws UserException {
- String authentication = conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null);
- if (AuthType.KERBEROS.getDesc().equals(authentication)) {
- conf.set("hadoop.security.authorization", "true");
- UserGroupInformation.setConfiguration(conf);
+ private UserGroupInformation doLogin(Configuration conf) throws UserException {
+ if (AuthType.KERBEROS.getDesc().equals(
+ conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) {
+ conf.set(HdfsResource.HADOOP_KERBEROS_AUTHORIZATION, "true");
String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
String keytab = conf.get(HdfsResource.HADOOP_KERBEROS_KEYTAB);
+
+ UserGroupInformation.setConfiguration(conf);
try {
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
UserGroupInformation.setLoginUser(ugi);
- LOG.info("kerberos authentication successful");
+ LOG.info("Login by kerberos authentication with principal: {}", principal);
return ugi;
} catch (IOException e) {
throw new UserException(e);
@@ -128,7 +147,10 @@ public class DFSFileSystem extends RemoteFileSystem {
hadoopUserName = "hadoop";
LOG.debug(HdfsResource.HADOOP_USER_NAME + " is unset, use default user: hadoop");
}
- return UserGroupInformation.createRemoteUser(hadoopUserName);
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
+ UserGroupInformation.setLoginUser(ugi);
+ LOG.info("Login by proxy user, hadoop.username: {}", hadoopUserName);
+ return ugi;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org