You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/04/12 07:37:36 UTC
[incubator-doris] branch branch-0.15 updated: [Feature] add support for tencent chdfs (#8964)
This is an automated email from the ASF dual-hosted git repository.
lide pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/branch-0.15 by this push:
new a90302b549 [Feature] add support for tencent chdfs (#8964)
a90302b549 is described below
commit a90302b549617d854f6d9792b465b0b31c7b723f
Author: wucheng <wu...@foxmail.com>
AuthorDate: Tue Apr 12 15:37:31 2022 +0800
[Feature] add support for tencent chdfs (#8964)
Co-authored-by: chengwu <ch...@tencent.com>
---
.../java/org/apache/doris/analysis/ExportStmt.java | 5 +-
.../org/apache/doris/analysis/StorageBackend.java | 3 +
.../doris/broker/hdfs/FileSystemManager.java | 135 ++++++++++++++++++++-
gensrc/thrift/Types.thrift | 1 +
4 files changed, 141 insertions(+), 3 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 275dc51bf1..095dbbd68a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -238,8 +238,9 @@ public class ExportStmt extends StatementBase {
String schema = uri.getScheme();
if (type == StorageBackend.StorageType.BROKER) {
if (schema == null || (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs")
- && !schema.equalsIgnoreCase("hdfs"))) {
- throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' path.");
+ && !schema.equalsIgnoreCase("hdfs") && !schema.equalsIgnoreCase("ofs"))) {
+ throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' , 'BOS://', "
+ + "or 'ofs://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 933153b728..70b10b4f06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -111,6 +111,7 @@ public class StorageBackend extends StorageDesc implements ParseNode {
public enum StorageType {
BROKER("Doris Broker"),
S3("Amazon S3 Simple Storage Service"),
+ OFS("Tencent CHDFS"),
// the following is not used currently
HDFS("Hadoop Distributed File System"),
LOCAL("Local file system");
@@ -132,6 +133,8 @@ public class StorageBackend extends StorageDesc implements ParseNode {
return TStorageBackendType.S3;
case HDFS:
return TStorageBackendType.HDFS;
+ case OFS:
+ return TStorageBackendType.OFS;
case LOCAL:
return TStorageBackendType.LOCAL;
default:
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 6cedd70449..526c914c7e 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -66,6 +66,7 @@ public class FileSystemManager {
// supported scheme
private static final String HDFS_SCHEME = "hdfs";
private static final String S3A_SCHEME = "s3a";
+ private static final String CHDFS_SCHEME = "ofs";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -152,7 +153,9 @@ public class FileSystemManager {
brokerFileSystem = getDistributedFileSystem(path, properties);
} else if (scheme.equals(S3A_SCHEME)) {
brokerFileSystem = getS3AFileSystem(path, properties);
- } else {
+ } else if (scheme.equals(CHDFS_SCHEME)) {
+ brokerFileSystem = getChdfsFileSystem(path, properties);
+ }else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
}
@@ -421,6 +424,136 @@ public class FileSystemManager {
}
}
+ /**
+ * visible for test
+ *
+ * file system handle is cached, the identity is for all chdfs.
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getChdfsFileSystem(String path, Map<String, String> properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String host = CHDFS_SCHEME;
+ String authentication = properties.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ AUTHENTICATION_SIMPLE);
+ if (Strings.isNullOrEmpty(authentication) || (!authentication.equals(AUTHENTICATION_SIMPLE)
+ && !authentication.equals(AUTHENTICATION_KERBEROS))) {
+ logger.warn("invalid authentication:" + authentication);
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ "invalid authentication:" + authentication);
+ }
+
+ FileSystemIdentity fileSystemIdentity = null;
+ if (authentication.equals(AUTHENTICATION_SIMPLE)) {
+ fileSystemIdentity = new FileSystemIdentity(host, "");
+ } else {
+ // for kerberos, use host + principal + keytab as filesystemindentity
+ String kerberosContent = "";
+ if (properties.containsKey(KERBEROS_KEYTAB)) {
+ kerberosContent = properties.get(KERBEROS_KEYTAB);
+ } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+ kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+ } else {
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ "keytab is required for kerberos authentication");
+ }
+ if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ "principal is required for kerberos authentication");
+ } else {
+ kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL);
+ }
+ try {
+ MessageDigest digest = MessageDigest.getInstance("md5");
+ byte[] result = digest.digest(kerberosContent.getBytes());
+ String kerberosUgi = new String(result);
+ fileSystemIdentity = new FileSystemIdentity(host, kerberosUgi);
+ } catch (NoSuchAlgorithmException e) {
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ e.getMessage());
+ }
+ }
+
+
+ BrokerFileSystem fileSystem = null;
+ cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+ fileSystem = cachedFileSystem.get(fileSystemIdentity);
+ if (fileSystem == null) {
+ // it means it is removed concurrently by checker thread
+ return null;
+ }
+ fileSystem.getLock().lock();
+
+ try {
+ if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+ // this means the file system is closed by file system checker thread
+ // it is a corner case
+ return null;
+ }
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ for (Map.Entry<String, String> propElement : properties.entrySet()) {
+ conf.set(propElement.getKey(), propElement.getValue());
+ }
+
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("create file system for new path " + path);
+ String tmpFilePath = null;
+ if (authentication.equals(AUTHENTICATION_KERBEROS)){
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ AUTHENTICATION_KERBEROS);
+
+ String principal = preparePrincipal(properties.get(KERBEROS_PRINCIPAL));
+ String keytab = "";
+ if (properties.containsKey(KERBEROS_KEYTAB)) {
+ keytab = properties.get(KERBEROS_KEYTAB);
+ } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+ // pass kerberos keytab content use base64 encoding
+ // so decode it and write it to tmp path under /tmp
+ // because ugi api only accept a local path as argument
+ String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT);
+ byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content);
+ long currentTime = System.currentTimeMillis();
+ Random random = new Random(currentTime);
+ int randNumber = random.nextInt(10000);
+ tmpFilePath = "/tmp/." + Long.toString(currentTime) + "_" + Integer.toString(randNumber);
+ FileOutputStream fileOutputStream = new FileOutputStream(tmpFilePath);
+ fileOutputStream.write(base64decodedBytes);
+ fileOutputStream.close();
+ keytab = tmpFilePath;
+ } else {
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ "keytab is required for kerberos authentication");
+ }
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+ try {
+ File file = new File(tmpFilePath);
+ if(!file.delete()){
+ logger.warn("delete tmp file:" + tmpFilePath + " failed");
+ }
+ } catch (Exception e) {
+ throw new BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
+ e.getMessage());
+ }
+ }
+ }
+ FileSystem chdfsFileSystem = FileSystem.get(pathUri.getUri(), conf);
+ fileSystem.setFileSystem(chdfsFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 8c9a46ff48..ac0838b4db 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -91,6 +91,7 @@ enum TTypeNodeType {
enum TStorageBackendType {
BROKER,
S3,
+ OFS,
HDFS,
LOCAL
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org