You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/12 17:08:57 UTC
[arrow-rs] branch master updated: feat(object_store): parse well-known storage urls (#3327)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 19f8e8cb0 feat(object_store): parse well-known storage urls (#3327)
19f8e8cb0 is described below
commit 19f8e8cb02d5ece6b64c5fb08d9eeac4b7293911
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Mon Dec 12 18:08:51 2022 +0100
feat(object_store): parse well-known storage urls (#3327)
* feat(object_store): add url parsing to azure builder
* feat(object_store): add url parsing to aws builder
* feat(object_store): add url parsing to gcs builder
* feat(object_store): parse gcs service account from env
* fix: typo
* docs(object_store): fix example / template urls
* feat(object_store): parse S3 virtually hosted urls
* refactor: raise url parsing errors on build
* fix: properly set virtual_hosted_style_request in url parsing
---
object_store/src/aws/mod.rs | 97 +++++++++++++++++++++++++++++++++++-
object_store/src/azure/mod.rs | 111 +++++++++++++++++++++++++++++++++++++++++-
object_store/src/gcp/mod.rs | 95 ++++++++++++++++++++++++++++++++++++
3 files changed, 300 insertions(+), 3 deletions(-)
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index aa419d605..0fcfbaf9c 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -42,6 +42,7 @@ use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;
use tracing::info;
+use url::Url;
use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{
@@ -116,6 +117,18 @@ enum Error {
#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: reqwest::header::ToStrError },
+
+ #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
+ UnableToParseUrl {
+ source: url::ParseError,
+ url: String,
+ },
+
+ #[snafu(display(
+ "Unknown url scheme cannot be parsed into storage location: {}",
+ scheme
+ ))]
+ UnknownUrlScheme { scheme: String },
}
impl From<Error> for super::Error {
@@ -359,6 +372,7 @@ pub struct AmazonS3Builder {
metadata_endpoint: Option<String>,
profile: Option<String>,
client_options: ClientOptions,
+ url_parse_error: Option<Error>,
}
impl AmazonS3Builder {
@@ -430,6 +444,67 @@ impl AmazonS3Builder {
builder
}
+ /// Parse available connection info form a well-known storage URL.
+ ///
+ /// The supported url schemes are:
+ ///
+ /// - `s3://<bucket>/<path>`
+ /// - `s3a://<bucket>/<path>`
+ /// - `https://s3.<bucket>.amazonaws.com`
+ /// - `https://<bucket>.s3.<region>.amazonaws.com`
+ ///
+ /// Please note that this is a best effort implementation, and will not fail for malformed URLs,
+ /// but rather warn and ignore the passed url. The url also has no effect on how the
+ /// storage is accessed - e.g. which driver or protocol is used for reading from the location.
+ ///
+ /// # Example
+ /// ```
+ /// use object_store::aws::AmazonS3Builder;
+ ///
+ /// let s3 = AmazonS3Builder::from_env()
+ /// .with_url("s3://bucket/path")
+ /// .build();
+ /// ```
+ pub fn with_url(mut self, url: impl AsRef<str>) -> Self {
+ let maybe_parsed = Url::parse(url.as_ref());
+ match maybe_parsed {
+ Ok(parsed) => match parsed.scheme() {
+ "s3" | "s3a" => {
+ self.bucket_name = parsed.host_str().map(|host| host.to_owned());
+ }
+ "https" => {
+ if let Some(host) = parsed.host_str() {
+ let parts = host.splitn(4, '.').collect::<Vec<&str>>();
+ if parts.len() == 4 && parts[0] == "s3" && parts[2] == "amazonaws"
+ {
+ self.bucket_name = Some(parts[1].to_string());
+ }
+ if parts.len() == 4
+ && parts[1] == "s3"
+ && parts[3] == "amazonaws.com"
+ {
+ self.bucket_name = Some(parts[0].to_string());
+ self.region = Some(parts[2].to_string());
+ self.virtual_hosted_style_request = true;
+ }
+ }
+ }
+ other => {
+ self.url_parse_error = Some(Error::UnknownUrlScheme {
+ scheme: other.into(),
+ });
+ }
+ },
+ Err(err) => {
+ self.url_parse_error = Some(Error::UnableToParseUrl {
+ source: err,
+ url: url.as_ref().into(),
+ });
+ }
+ };
+ self
+ }
+
/// Set the AWS Access Key (required)
pub fn with_access_key_id(mut self, access_key_id: impl Into<String>) -> Self {
self.access_key_id = Some(access_key_id.into());
@@ -567,6 +642,10 @@ impl AmazonS3Builder {
/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(self) -> Result<AmazonS3> {
+ if let Some(err) = self.url_parse_error {
+ return Err(err.into());
+ }
+
let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
let region = self.region.context(MissingRegionSnafu)?;
@@ -642,8 +721,8 @@ impl AmazonS3Builder {
let endpoint: String;
let bucket_endpoint: String;
- //If `endpoint` is provided then its assumed to be consistent with
- // `virutal_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then
+ // If `endpoint` is provided then its assumed to be consistent with
+ // `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then
// `endpoint` should have bucket name included.
if self.virtual_hosted_style_request {
endpoint = self.endpoint.unwrap_or_else(|| {
@@ -940,4 +1019,18 @@ mod tests {
err
);
}
+
+ #[test]
+ fn s3_test_urls() {
+ let builder = AmazonS3Builder::new().with_url("s3://bucket/path");
+ assert_eq!(builder.bucket_name, Some("bucket".to_string()));
+
+ let builder = AmazonS3Builder::new().with_url("https://s3.bucket.amazonaws.com");
+ assert_eq!(builder.bucket_name, Some("bucket".to_string()));
+
+ let builder =
+ AmazonS3Builder::new().with_url("https://bucket.s3.region.amazonaws.com");
+ assert_eq!(builder.bucket_name, Some("bucket".to_string()));
+ assert_eq!(builder.region, Some("region".to_string()))
+ }
}
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 4b7131ea8..2cc4fe1a4 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -114,6 +114,12 @@ enum Error {
#[snafu(display("Azure credential error: {}", source), context(false))]
Credential { source: credential::Error },
+
+ #[snafu(display(
+ "Unknown url scheme cannot be parsed into storage location: {}",
+ scheme
+ ))]
+ UnknownUrlScheme { scheme: String },
}
impl From<Error> for super::Error {
@@ -361,6 +367,7 @@ pub struct MicrosoftAzureBuilder {
use_emulator: bool,
retry_config: RetryConfig,
client_options: ClientOptions,
+ url_parse_error: Option<Error>,
}
impl Debug for MicrosoftAzureBuilder {
@@ -379,7 +386,7 @@ impl MicrosoftAzureBuilder {
Default::default()
}
- /// Create an instance of [MicrosoftAzureBuilder] with values pre-populated from environment variables.
+ /// Create an instance of [`MicrosoftAzureBuilder`] with values pre-populated from environment variables.
///
/// Variables extracted from environment:
/// * AZURE_STORAGE_ACCOUNT_NAME: storage account name
@@ -424,6 +431,78 @@ impl MicrosoftAzureBuilder {
builder
}
+ /// Parse available connection info form a well-known storage URL.
+ ///
+ /// The supported url schemes are:
+ ///
+ /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
+ /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
+ /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
+ /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
+ /// - `azure://<container>/<path>` (custom)
+ /// - `https://<account>.dfs.core.windows.net`
+ /// - `https://<account>.blob.core.windows.net`
+ ///
+ /// Please note that this is a best effort implementation, and will not fail for malformed URLs,
+ /// but rather warn and ignore the passed url. The url also has no effect on how the
+ /// storage is accessed - e.g. which driver or protocol is used for reading from the location.
+ ///
+ /// # Example
+ /// ```
+ /// use object_store::azure::MicrosoftAzureBuilder;
+ ///
+ /// let azure = MicrosoftAzureBuilder::from_env()
+ /// .with_url("abfss://file_system@account.dfs.core.windows.net/")
+ /// .build();
+ /// ```
+ pub fn with_url(mut self, url: impl AsRef<str>) -> Self {
+ let maybe_parsed = Url::parse(url.as_ref());
+ match maybe_parsed {
+ Ok(parsed) => match parsed.scheme() {
+ "az" | "adl" | "azure" => {
+ self.container_name = parsed.host_str().map(|host| host.to_owned());
+ }
+ "abfs" | "abfss" => {
+ // abfs(s) might refer to the fsspec convention abfs://<container>/<path>
+ // or the convention for the hadoop driver abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>
+ if parsed.username().is_empty() {
+ self.container_name =
+ parsed.host_str().map(|host| host.to_owned());
+ } else if let Some(host) = parsed.host_str() {
+ let parts = host.splitn(2, '.').collect::<Vec<&str>>();
+ if parts.len() == 2 && parts[1] == "dfs.core.windows.net" {
+ self.container_name = Some(parsed.username().to_owned());
+ self.account_name = Some(parts[0].to_string());
+ }
+ }
+ }
+ "https" => {
+ if let Some(host) = parsed.host_str() {
+ let parts = host.splitn(2, '.').collect::<Vec<&str>>();
+ if parts.len() == 2
+ && (parts[1] == "dfs.core.windows.net"
+ || parts[1] == "blob.core.windows.net")
+ {
+ self.account_name = Some(parts[0].to_string());
+ }
+ }
+ }
+ other => {
+ self.url_parse_error = Some(Error::UnknownUrlScheme {
+ scheme: other.into(),
+ });
+ }
+ },
+ Err(err) => {
+ self.url_parse_error = Some(Error::UnableToParseUrl {
+ source: err,
+ url: url.as_ref().into(),
+ });
+ }
+ };
+ self
+ }
+
/// Set the Azure Account (required)
pub fn with_account(mut self, account: impl Into<String>) -> Self {
self.account_name = Some(account.into());
@@ -529,8 +608,13 @@ impl MicrosoftAzureBuilder {
retry_config,
authority_host,
mut client_options,
+ url_parse_error,
} = self;
+ if let Some(err) = url_parse_error {
+ return Err(err.into());
+ }
+
let container = container_name.ok_or(Error::MissingContainerName {})?;
let (is_emulator, storage_url, auth, account) = if use_emulator {
@@ -716,4 +800,29 @@ mod tests {
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}
+
+ #[test]
+ fn azure_blob_test_urls() {
+ let builder = MicrosoftAzureBuilder::new()
+ .with_url("abfss://file_system@account.dfs.core.windows.net/");
+ assert_eq!(builder.account_name, Some("account".to_string()));
+ assert_eq!(builder.container_name, Some("file_system".to_string()));
+
+ let builder = MicrosoftAzureBuilder::new().with_url("abfs://container/path");
+ assert_eq!(builder.container_name, Some("container".to_string()));
+
+ let builder = MicrosoftAzureBuilder::new().with_url("az://container");
+ assert_eq!(builder.container_name, Some("container".to_string()));
+
+ let builder = MicrosoftAzureBuilder::new().with_url("az://container/path");
+ assert_eq!(builder.container_name, Some("container".to_string()));
+
+ let builder = MicrosoftAzureBuilder::new()
+ .with_url("https://account.dfs.core.windows.net/");
+ assert_eq!(builder.account_name, Some("account".to_string()));
+
+ let builder = MicrosoftAzureBuilder::new()
+ .with_url("https://account.blob.core.windows.net/");
+ assert_eq!(builder.account_name, Some("account".to_string()))
+ }
}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index f93cbde3d..b3bd57256 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -44,6 +44,7 @@ use reqwest::header::RANGE;
use reqwest::{header, Client, Method, Response, StatusCode};
use snafu::{ResultExt, Snafu};
use tokio::io::AsyncWrite;
+use url::Url;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
@@ -129,6 +130,18 @@ enum Error {
source: crate::client::retry::Error,
path: String,
},
+
+ #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
+ UnableToParseUrl {
+ source: url::ParseError,
+ url: String,
+ },
+
+ #[snafu(display(
+ "Unknown url scheme cannot be parsed into storage location: {}",
+ scheme
+ ))]
+ UnknownUrlScheme { scheme: String },
}
impl From<Error> for super::Error {
@@ -766,6 +779,7 @@ pub struct GoogleCloudStorageBuilder {
service_account_path: Option<String>,
retry_config: RetryConfig,
client_options: ClientOptions,
+ url_parse_error: Option<Error>,
}
impl Default for GoogleCloudStorageBuilder {
@@ -775,6 +789,7 @@ impl Default for GoogleCloudStorageBuilder {
service_account_path: None,
retry_config: Default::default(),
client_options: ClientOptions::new().with_allow_http(true),
+ url_parse_error: None,
}
}
}
@@ -785,6 +800,75 @@ impl GoogleCloudStorageBuilder {
Default::default()
}
+ /// Create an instance of [`GoogleCloudStorageBuilder`] with values pre-populated from environment variables.
+ ///
+ /// Variables extracted from environment:
+ /// * GOOGLE_SERVICE_ACCOUNT: location of service account file
+ /// * SERVICE_ACCOUNT: (alias) location of service account file
+ ///
+ /// # Example
+ /// ```
+ /// use object_store::gcp::GoogleCloudStorageBuilder;
+ ///
+ /// let azure = GoogleCloudStorageBuilder::from_env()
+ /// .with_bucket_name("foo")
+ /// .build();
+ /// ```
+ pub fn from_env() -> Self {
+ let mut builder = Self::default();
+
+ if let Ok(service_account_path) = std::env::var("SERVICE_ACCOUNT") {
+ builder.service_account_path = Some(service_account_path);
+ }
+
+ if let Ok(service_account_path) = std::env::var("GOOGLE_SERVICE_ACCOUNT") {
+ builder.service_account_path = Some(service_account_path);
+ }
+
+ builder
+ }
+
+ /// Parse available connection info form a well-known storage URL.
+ ///
+ /// The supported url schemes are:
+ ///
+ /// - `gs://<bucket>/<path>`
+ ///
+ /// Please note that this is a best effort implementation, and will not fail for malformed URLs,
+ /// but rather warn and ignore the passed url. The url also has no effect on how the
+ /// storage is accessed - e.g. which driver or protocol is used for reading from the location.
+ ///
+ /// # Example
+ /// ```
+ /// use object_store::gcp::GoogleCloudStorageBuilder;
+ ///
+ /// let gcs = GoogleCloudStorageBuilder::from_env()
+ /// .with_url("gs://bucket/path")
+ /// .build();
+ /// ```
+ pub fn with_url(mut self, url: impl AsRef<str>) -> Self {
+ let maybe_parsed = Url::parse(url.as_ref());
+ match maybe_parsed {
+ Ok(parsed) => match parsed.scheme() {
+ "gs" => {
+ self.bucket_name = parsed.host_str().map(|host| host.to_owned());
+ }
+ other => {
+ self.url_parse_error = Some(Error::UnknownUrlScheme {
+ scheme: other.into(),
+ });
+ }
+ },
+ Err(err) => {
+ self.url_parse_error = Some(Error::UnableToParseUrl {
+ source: err,
+ url: url.as_ref().into(),
+ });
+ }
+ };
+ self
+ }
+
/// Set the bucket name (required)
pub fn with_bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
self.bucket_name = Some(bucket_name.into());
@@ -838,8 +922,13 @@ impl GoogleCloudStorageBuilder {
service_account_path,
retry_config,
client_options,
+ url_parse_error,
} = self;
+ if let Some(err) = url_parse_error {
+ return Err(err.into());
+ }
+
let bucket_name = bucket_name.ok_or(Error::MissingBucketName {})?;
let service_account_path =
service_account_path.ok_or(Error::MissingServiceAccountPath)?;
@@ -1095,4 +1184,10 @@ mod test {
err
);
}
+
+ #[test]
+ fn gcs_test_urls() {
+ let builder = GoogleCloudStorageBuilder::new().with_url("gs://bucket/path");
+ assert_eq!(builder.bucket_name, Some("bucket".to_string()))
+ }
}