You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/03/01 04:53:57 UTC
[incubator-doris] branch master updated: Support Amazon S3 data
source in Broker Load (#3004)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 078e35a Support Amazon S3 data source in Broker Load (#3004)
078e35a is described below
commit 078e35a62e07d54236a96e3f1043ddfe0a6c6382
Author: frwrdt <38...@users.noreply.github.com>
AuthorDate: Sun Mar 1 12:53:50 2020 +0800
Support Amazon S3 data source in Broker Load (#3004)
---
.../Data Manipulation/BROKER LOAD.md | 26 +++++-
fs_brokers/apache_hdfs_broker/pom.xml | 7 ++
.../doris/broker/hdfs/FileSystemManager.java | 101 +++++++++++++++++++--
.../org/apache/doris/common/BrokerPerfMonitor.java | 2 +-
.../doris/broker/hdfs/TestFileSystemManager.java | 11 +++
5 files changed, 138 insertions(+), 9 deletions(-)
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index a89f26c..08061b2 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
@@ -28,6 +28,7 @@ under the License.
2. Baidu AFS:百度内部的 afs,仅限于百度内部使用。
3. Baidu Object Storage(BOS):百度对象存储。仅限百度内部用户、公有云用户或其他可以访问 BOS 的用户使用。
4. Apache HDFS:社区版本 hdfs。
+ 5. Amazon S3:Amazon对象存储。
语法:
@@ -145,6 +146,13 @@ under the License.
dfs.namenode.rpc-address.xxx.nn:指定 namenode 的rpc地址信息。其中 nn 表示 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"
dfs.client.failover.proxy.provider:指定 client 连接 namenode 的 provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+ 4. Amazon S3
+
+ 需提供:
+ fs.s3a.access.key:AmazonS3的access key
+ fs.s3a.secret.key:AmazonS3的secret key
+ fs.s3a.endpoint:AmazonS3的endpoint
+
4. opt_properties
用于指定一些特殊参数。
@@ -375,7 +383,23 @@ under the License.
INTO TABLE `my_table`
where k1 > k2
)
-
+
+ 11. 从 AmazonS3 导入Parquet文件中数据,指定 FORMAT 为parquet,默认是通过文件后缀判断:
+
+ LOAD LABEL example_db.label11
+ (
+ DATA INFILE("s3a://my_bucket/input/file")
+ INTO TABLE `my_table`
+ FORMAT AS "parquet"
+ (k1, k2, k3)
+ )
+ WITH BROKER my_s3a_broker
+ (
+ "fs.s3a.access.key" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
+ "fs.s3a.secret.key" = "yyyyyyyyyyyyyyyyyyyy",
+ "fs.s3a.endpoint" = "s3.amazonaws.com"
+ )
+
## keyword
BROKER,LOAD
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml b/fs_brokers/apache_hdfs_broker/pom.xml
index cc95619..1db9c68 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -266,6 +266,13 @@ under the License.
<version>1.3.2</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>2.7.3</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 5e0d90f..8b0274a 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -61,7 +61,10 @@ public class FileSystemManager {
private static Logger logger = Logger
.getLogger(FileSystemManager.class.getName());
- private static final String HDFS_SCHEME = "hdfs://";
+ // supported scheme
+ private static final String HDFS_SCHEME = "hdfs";
+ private static final String S3A_SCHEME = "s3a";
+
private static final String HDFS_UGI_CONF = "hadoop.job.ugi";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -81,7 +84,12 @@ public class FileSystemManager {
private static final String DEFAULT_DFS_CLIENT_FAILOVER_PROXY_PROVIDER =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
private static final String FS_DEFAULTFS_KEY = "fs.defaultFS";
-
+
+ // arguments for s3a
+ private static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+ private static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+ private static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+
private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@@ -120,18 +128,46 @@ public class FileSystemManager {
/**
* visible for test
*
+ * @param path
+ * @param properties
+ * @return BrokerFileSystem with different FileSystem based on scheme
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getFileSystem(String path, Map<String, String> properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String scheme = pathUri.getUri().getScheme();
+ if (Strings.isNullOrEmpty(scheme)) {
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
+ "invalid path. scheme is null");
+ }
+ BrokerFileSystem brokerFileSystem = null;
+ if (scheme.equals(HDFS_SCHEME)) {
+ brokerFileSystem = getDistributedFileSystem(path, properties);
+ } else if (scheme.equals(S3A_SCHEME)) {
+ brokerFileSystem = getS3AFileSystem(path, properties);
+ } else {
+ throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
+ "invalid path. scheme is not supported");
+ }
+ return brokerFileSystem;
+ }
+
+ /**
+ * visible for test
+ *
* file system handle is cached, the identity is host + username_password
* it will have safety problem if only hostname is used because one user may specify username and password
* and then access hdfs, another user may not specify username and password but could also access data
* @param path
* @param properties
* @return
- * @throws URISyntaxException
- * @throws Exception
+ * @throws URISyntaxException
+ * @throws Exception
*/
- public BrokerFileSystem getFileSystem(String path, Map<String, String> properties) {
+ public BrokerFileSystem getDistributedFileSystem(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
- String host = HDFS_SCHEME + pathUri.getAuthority();
+ String host = HDFS_SCHEME + "://" + pathUri.getAuthority();
if (Strings.isNullOrEmpty(pathUri.getAuthority())) {
if (properties.containsKey(FS_DEFAULTFS_KEY)) {
host = properties.get(FS_DEFAULTFS_KEY);
@@ -307,7 +343,58 @@ public class FileSystemManager {
fileSystem.getLock().unlock();
}
}
-
+
+ /**
+ * visible for test
+ *
+ * file system handle is cached, the identity is host + accessKey_secretKey
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getS3AFileSystem(String path, Map<String, String> properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String accessKey = properties.getOrDefault(FS_S3A_ACCESS_KEY, "");
+ String secretKey = properties.getOrDefault(FS_S3A_SECRET_KEY, "");
+ String endpoint = properties.getOrDefault(FS_S3A_ENDPOINT, "");
+ String host = S3A_SCHEME + "://" + endpoint;
+ String s3aUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, s3aUgi);
+ BrokerFileSystem fileSystem = null;
+ cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+ fileSystem = cachedFileSystem.get(fileSystemIdentity);
+ if (fileSystem == null) {
+ // it means it is removed concurrently by checker thread
+ return null;
+ }
+ fileSystem.getLock().lock();
+ try {
+ if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+ // this means the file system is closed by file system checker thread
+ // it is a corner case
+ return null;
+ }
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("could not find file system for path " + path + " create a new one");
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ conf.set(FS_S3A_ACCESS_KEY, accessKey);
+ conf.set(FS_S3A_SECRET_KEY, secretKey);
+ conf.set(FS_S3A_ENDPOINT, endpoint);
+ FileSystem s3AFileSystem = FileSystem.get(pathUri.getUri(), conf);
+ fileSystem.setFileSystem(s3AFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java
index 5780405..346472b 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java
@@ -22,7 +22,7 @@ import com.google.common.base.Stopwatch;
public class BrokerPerfMonitor {
public static Stopwatch startWatch() {
- Stopwatch stopwatch = new Stopwatch();
+ Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
return stopwatch;
}
diff --git a/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
index 3eaafec..7c89b72 100644
--- a/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
@@ -200,4 +200,15 @@ public class TestFileSystemManager extends TestCase {
isPathExist = fileSystemManager.checkPathExist(tempFile2, properties);
assertFalse(isPathExist);
}
+
+ @Test
+ public void testGetFileSystemForS3aScheme() throws IOException {
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("fs.s3a.access.key", "accessKey");
+ properties.put("fs.s3a.secret.key", "secretKey");
+ properties.put("fs.s3a.endpoint", "s3.test.com");
+ BrokerFileSystem fs = fileSystemManager.getFileSystem("s3a://testbucket/data/abc/logs", properties);
+ assertNotNull(fs);
+ fs.getDFSFileSystem().close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org