You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/12/30 05:01:59 UTC
[doris] branch branch-1.2-lts updated: [Broker](bos) suppoert baidu bos object storage for broker (#15448)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 7ccd504932 [Broker](bos) suppoert baidu bos object storage for broker (#15448)
7ccd504932 is described below
commit 7ccd5049325515d21489eaee506084f47d3a4101
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Fri Dec 30 12:39:10 2022 +0800
[Broker](bos) suppoert baidu bos object storage for broker (#15448)
---
.../java/org/apache/doris/analysis/ExportStmt.java | 8 +-
.../doris/broker/hdfs/FileSystemIdentity.java | 17 +++
.../doris/broker/hdfs/FileSystemManager.java | 119 +++++++++++++++++++--
3 files changed, 135 insertions(+), 9 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 077729c81e..62fb2c9977 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -236,14 +236,16 @@ public class ExportStmt extends StatementBase {
URI uri = URI.create(path);
String schema = uri.getScheme();
if (type == StorageBackend.StorageType.BROKER) {
- if (schema == null || (!schema.equalsIgnoreCase("hdfs")
+ if (schema == null || (!schema.equalsIgnoreCase("bos")
+ && !schema.equalsIgnoreCase("afs")
+ && !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
&& !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn"))) {
- throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'ofs://', 'obs://',"
- + "'oss://', 's3a://' or 'cosn://' path.");
+ throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+ + " 'ofs://', 'obs://', 'oss://', 's3a://' or 'cosn://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java
index 9446638a52..2885ba1a58 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java
@@ -21,10 +21,19 @@ public class FileSystemIdentity {
private final String hostName;
private final String ugiInfo;
+
+ private final String extraInfo;
public FileSystemIdentity(String hostName, String ugiInfo) {
this.hostName = hostName;
this.ugiInfo = ugiInfo;
+ this.extraInfo = null;
+ }
+
+ public FileSystemIdentity(String hostName, String ugiInfo, String extraInfo) {
+ this.hostName = hostName;
+ this.ugiInfo = ugiInfo;
+ this.extraInfo = extraInfo;
}
@Override
@@ -34,6 +43,7 @@ public class FileSystemIdentity {
result = prime * result
+ ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + ((ugiInfo == null) ? 0 : ugiInfo.hashCode());
+ result = prime * result + ((extraInfo == null) ? 0 : extraInfo.hashCode());
return result;
}
@@ -63,6 +73,13 @@ public class FileSystemIdentity {
} else if (!ugiInfo.equals(other.ugiInfo)) {
return false;
}
+ if (extraInfo == null) {
+ if (other.extraInfo != null) {
+ return false;
+ }
+ } else if (!extraInfo.equals(other.extraInfo)) {
+ return false;
+ }
return true;
}
}
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 42ff97123b..27b548af8c 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -42,6 +42,7 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -71,6 +72,8 @@ public class FileSystemManager {
private static final String OBS_SCHEME = "obs";
private static final String OSS_SCHEME = "oss";
private static final String COS_SCHEME = "cosn";
+ private static final String BOS_SCHEME = "bos";
+ private static final String AFS_SCHEME = "afs";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -132,6 +135,23 @@ public class FileSystemManager {
private static final String FS_COS_IMPL = "fs.cosn.impl";
private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache";
+ // arguments for bos
+ private static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key";
+ private static final String FS_BOS_SECRET_KEY = "fs.bos.secret.access.key";
+ private static final String FS_BOS_ENDPOINT = "fs.bos.endpoint";
+ private static final String FS_BOS_IMPL = "fs.bos.impl";
+ private static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size";
+
+
+ // arguments for afs
+ private static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name";
+ private static final String HADOOP_JOB_UGI = "hadoop.job.ugi";
+ private static final String FS_DEFAULT_NAME = "fs.default.name";
+ private static final String FS_AFS_IMPL = "fs.afs.impl";
+ private static final String DFS_AGENT_PORT = "dfs.agent.port";
+ private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method";
+ private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout";
+
private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@@ -197,6 +217,8 @@ public class FileSystemManager {
brokerFileSystem = getOSSFileSystem(path, properties);
} else if (scheme.equals(COS_SCHEME)) {
brokerFileSystem = getCOSFileSystem(path, properties);
+ } else if (scheme.equals(BOS_SCHEME)) {
+ brokerFileSystem = getBOSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@@ -548,8 +570,8 @@ public class FileSystemManager {
String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, "");
String disableCache = properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true");
String host = OSS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
- String obsUgi = accessKey + "," + secretKey;
- FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, obsUgi);
+ String ossUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ossUgi);
cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
@@ -608,11 +630,11 @@ public class FileSystemManager {
} else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
} else {
- throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"keytab is required for kerberos authentication");
}
if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
- throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"principal is required for kerberos authentication");
} else {
kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL);
@@ -651,8 +673,8 @@ public class FileSystemManager {
// pass kerberos keytab content use base64 encoding
// so decode it and write it to tmp path under /tmp
// because ugi api only accept a local path as argument
- String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT);
- byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content);
+ String keytabContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+ byte[] base64decodedBytes = Base64.getDecoder().decode(keytabContent);
long currentTime = System.currentTimeMillis();
Random random = new Random(currentTime);
int randNumber = random.nextInt(10000);
@@ -737,6 +759,91 @@ public class FileSystemManager {
}
}
+ /**
+ * visible for test
+ * <p>
+ * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
+ *
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getBOSFileSystem(String path, Map<String, String> properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String accessKey = properties.getOrDefault(FS_BOS_ACCESS_KEY, "");
+ String secretKey = properties.getOrDefault(FS_BOS_SECRET_KEY, "");
+ String endpoint = properties.getOrDefault(FS_BOS_ENDPOINT, "");
+ String multiPartUploadBlockSize = properties.getOrDefault(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, "9437184");
+ // endpoint is the server host, pathUri.getUri().getHost() is the bucket
+ // we should use these two params as the host identity, because FileSystem will cache both.
+ String host = BOS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
+ String bosUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, bosUgi);
+ BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
+ try {
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("could not find file system for path " + path + " create a new one");
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ conf.set(FS_BOS_ACCESS_KEY, accessKey);
+ conf.set(FS_BOS_SECRET_KEY, secretKey);
+ conf.set(FS_BOS_ENDPOINT, endpoint);
+ conf.set(FS_BOS_IMPL, "org.apache.hadoop.fs.bos.BaiduBosFileSystem");
+ conf.set(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, multiPartUploadBlockSize);
+ FileSystem bosFileSystem = FileSystem.get(pathUri.getUri(), conf);
+ fileSystem.setFileSystem(bosFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
+ private BrokerFileSystem getAfsFileSystem(String path, Map<String, String> properties) {
+ URI pathUri = new WildcardURI(path).getUri();
+ String host = pathUri.getScheme() + "://" + pathUri.getAuthority();
+ String username = properties.containsKey(USER_NAME_KEY) ? properties.get(USER_NAME_KEY) : "";
+ String password = properties.containsKey(PASSWORD_KEY) ? properties.get(PASSWORD_KEY) : "";
+ String group = properties.containsKey(HADOOP_JOB_GROUP_NAME) ? properties.get(HADOOP_JOB_GROUP_NAME) : "";
+ String afsUgi = username + "," + password;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, afsUgi, group);
+ cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+ BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
+ try {
+ if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+ // this means the file system is closed by file system checker thread
+ // it is a corner case
+ return null;
+ }
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("could not find file system for path " + path + " create a new one");
+ // create a new file system
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_JOB_UGI, afsUgi);
+ conf.set(HADOOP_JOB_GROUP_NAME, group);
+ conf.set(FS_DEFAULT_NAME, host);
+ conf.set(FS_AFS_IMPL, "org.apache.hadoop.fs.DFileSystem");
+ conf.set(DFS_CLIENT_AUTH_METHOD, properties.getOrDefault(DFS_CLIENT_AUTH_METHOD, "3"));
+ conf.set(DFS_AGENT_PORT, properties.getOrDefault(DFS_AGENT_PORT, "20001"));
+ conf.set(DFS_RPC_TIMEOUT, properties.getOrDefault(DFS_RPC_TIMEOUT, "300000"));
+ FileSystem dfsFileSystem = FileSystem.get(URI.create(host), conf);
+ fileSystem.setFileSystem(dfsFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e, e.getMessage());
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org