You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/19 13:34:28 UTC
[doris] 02/36: [feature](multicatalog) enable doris hive/iceberg catalog to read data on tencent GooseFS (#18685)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 18ccf8a66d172ada321b417f719cbad1b5e9fbab
Author: Yulei-Yang <yu...@gmail.com>
AuthorDate: Sun Apr 16 18:11:57 2023 +0800
[feature](multicatalog) enable doris hive/iceberg catalog to read data on tencent GooseFS (#18685)
---
.../java/org/apache/doris/common/FeConstants.java | 1 +
.../doris/planner/external/HiveScanProvider.java | 2 +
.../external/iceberg/IcebergScanProvider.java | 2 +
.../doris/broker/hdfs/FileSystemManager.java | 43 +++++++++++++++++++++-
4 files changed, 47 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 19dff3e707..fc4d273178 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -81,6 +81,7 @@ public class FeConstants {
public static String FS_PREFIX_COS = "cos";
public static String FS_PREFIX_OBS = "obs";
public static String FS_PREFIX_OFS = "ofs";
+ public static String FS_PREFIX_GFS = "gfs";
public static String FS_PREFIX_JFS = "jfs";
public static String FS_PREFIX_HDFS = "hdfs";
public static String FS_PREFIX_FILE = "file";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 99f84a128f..b09b61fea5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -101,6 +101,8 @@ public class HiveScanProvider extends HMSTableScanProvider {
return TFileType.FILE_LOCAL;
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
return TFileType.FILE_BROKER;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
+ return TFileType.FILE_BROKER;
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
return TFileType.FILE_BROKER;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
index c481362da7..c710afc3fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
@@ -111,6 +111,8 @@ public class IcebergScanProvider extends QueryScanProvider {
return TFileType.FILE_LOCAL;
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
return TFileType.FILE_BROKER;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
+ return TFileType.FILE_BROKER;
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
return TFileType.FILE_BROKER;
}
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 21cbe48aca..17f4d13216 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -75,6 +75,7 @@ public class FileSystemManager {
private static final String BOS_SCHEME = "bos";
private static final String JFS_SCHEME = "jfs";
private static final String AFS_SCHEME = "afs";
+ private static final String GFS_SCHEME = "gfs";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -221,7 +222,9 @@ public class FileSystemManager {
brokerFileSystem = getBOSFileSystem(path, properties);
} else if (scheme.equals(JFS_SCHEME)) {
brokerFileSystem = getJuiceFileSystem(path, properties);
- }else {
+ } else if (scheme.equals(GFS_SCHEME)) {
+ brokerFileSystem = getGooseFSFileSystem(path, properties);
+ } else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
}
@@ -973,6 +976,44 @@ public class FileSystemManager {
}
}
+ /**
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getGooseFSFileSystem(String path, Map<String, String> properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ // endpoint is the server host, pathUri.getUri().getHost() is the bucket
+ // we should use these two params as the host identity, because FileSystem will cache both.
+ String host = GFS_SCHEME + "://" + pathUri.getAuthority();
+
+ String username = properties.getOrDefault(USER_NAME_KEY, "");
+ String password = properties.getOrDefault(PASSWORD_KEY, "");
+ String gfsUgi = username + "," + password;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, gfsUgi);
+ BrokerFileSystem brokerFileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+ brokerFileSystem.getLock().lock();
+ try {
+ if (brokerFileSystem.getDFSFileSystem() == null) {
+ logger.info("create goosefs client: " + path);
+ Configuration conf = new Configuration();
+ for (Map.Entry<String, String> propElement : properties.entrySet()) {
+ conf.set(propElement.getKey(), propElement.getValue());
+ }
+ FileSystem fileSystem = FileSystem.get(pathUri.getUri(), conf);
+ brokerFileSystem.setFileSystem(fileSystem);
+ }
+ return brokerFileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ brokerFileSystem.getLock().unlock();
+ }
+ }
+
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org