You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/12 17:08:57 UTC

[arrow-rs] branch master updated: feat(object_store): parse well-known storage urls (#3327)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f8e8cb0 feat(object_store): parse well-known storage urls (#3327)
19f8e8cb0 is described below

commit 19f8e8cb02d5ece6b64c5fb08d9eeac4b7293911
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Mon Dec 12 18:08:51 2022 +0100

    feat(object_store): parse well-known storage urls (#3327)
    
    * feat(object_store): add url parsing to azure builder
    
    * feat(object_store): add url parsing to aws builder
    
    * feat(object_store): add url parsing to gcs builder
    
    * feat(object_store): parse gcs service account from env
    
    * fix: typo
    
    * docs(object_store): fix example / template urls
    
    * feat(object_store): parse S3 virtually hosted urls
    
    * refactor: raise url parsing errors on build
    
    * fix: properly set virtual_hosted_style_request in url parsing
---
 object_store/src/aws/mod.rs   |  97 +++++++++++++++++++++++++++++++++++-
 object_store/src/azure/mod.rs | 111 +++++++++++++++++++++++++++++++++++++++++-
 object_store/src/gcp/mod.rs   |  95 ++++++++++++++++++++++++++++++++++++
 3 files changed, 300 insertions(+), 3 deletions(-)

diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index aa419d605..0fcfbaf9c 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -42,6 +42,7 @@ use std::ops::Range;
 use std::sync::Arc;
 use tokio::io::AsyncWrite;
 use tracing::info;
+use url::Url;
 
 use crate::aws::client::{S3Client, S3Config};
 use crate::aws::credential::{
@@ -116,6 +117,18 @@ enum Error {
 
     #[snafu(display("Received header containing non-ASCII data"))]
     BadHeader { source: reqwest::header::ToStrError },
+
+    #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
+    UnableToParseUrl {
+        source: url::ParseError,
+        url: String,
+    },
+
+    #[snafu(display(
+        "Unknown url scheme cannot be parsed into storage location: {}",
+        scheme
+    ))]
+    UnknownUrlScheme { scheme: String },
 }
 
 impl From<Error> for super::Error {
@@ -359,6 +372,7 @@ pub struct AmazonS3Builder {
     metadata_endpoint: Option<String>,
     profile: Option<String>,
     client_options: ClientOptions,
+    url_parse_error: Option<Error>,
 }
 
 impl AmazonS3Builder {
@@ -430,6 +444,67 @@ impl AmazonS3Builder {
         builder
     }
 
+    /// Parse available connection info form a well-known storage URL.
+    ///
+    /// The supported url schemes are:
+    ///
+    /// - `s3://<bucket>/<path>`
+    /// - `s3a://<bucket>/<path>`
+    /// - `https://s3.<bucket>.amazonaws.com`
+    /// - `https://<bucket>.s3.<region>.amazonaws.com`
+    ///
+    /// Please note that this is a best effort implementation, and will not fail for malformed URLs,
+    /// but rather warn and ignore the passed url. The url also has no effect on how the
+    /// storage is accessed - e.g. which driver or protocol is used for reading from the location.
+    ///
+    /// # Example
+    /// ```
+    /// use object_store::aws::AmazonS3Builder;
+    ///
+    /// let s3 = AmazonS3Builder::from_env()
+    ///     .with_url("s3://bucket/path")
+    ///     .build();
+    /// ```
+    pub fn with_url(mut self, url: impl AsRef<str>) -> Self {
+        let maybe_parsed = Url::parse(url.as_ref());
+        match maybe_parsed {
+            Ok(parsed) => match parsed.scheme() {
+                "s3" | "s3a" => {
+                    self.bucket_name = parsed.host_str().map(|host| host.to_owned());
+                }
+                "https" => {
+                    if let Some(host) = parsed.host_str() {
+                        let parts = host.splitn(4, '.').collect::<Vec<&str>>();
+                        if parts.len() == 4 && parts[0] == "s3" && parts[2] == "amazonaws"
+                        {
+                            self.bucket_name = Some(parts[1].to_string());
+                        }
+                        if parts.len() == 4
+                            && parts[1] == "s3"
+                            && parts[3] == "amazonaws.com"
+                        {
+                            self.bucket_name = Some(parts[0].to_string());
+                            self.region = Some(parts[2].to_string());
+                            self.virtual_hosted_style_request = true;
+                        }
+                    }
+                }
+                other => {
+                    self.url_parse_error = Some(Error::UnknownUrlScheme {
+                        scheme: other.into(),
+                    });
+                }
+            },
+            Err(err) => {
+                self.url_parse_error = Some(Error::UnableToParseUrl {
+                    source: err,
+                    url: url.as_ref().into(),
+                });
+            }
+        };
+        self
+    }
+
     /// Set the AWS Access Key (required)
     pub fn with_access_key_id(mut self, access_key_id: impl Into<String>) -> Self {
         self.access_key_id = Some(access_key_id.into());
@@ -567,6 +642,10 @@ impl AmazonS3Builder {
     /// Create a [`AmazonS3`] instance from the provided values,
     /// consuming `self`.
     pub fn build(self) -> Result<AmazonS3> {
+        if let Some(err) = self.url_parse_error {
+            return Err(err.into());
+        }
+
         let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
         let region = self.region.context(MissingRegionSnafu)?;
 
@@ -642,8 +721,8 @@ impl AmazonS3Builder {
         let endpoint: String;
         let bucket_endpoint: String;
 
-        //If `endpoint` is provided then its assumed to be consistent with
-        // `virutal_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then
+        // If `endpoint` is provided then its assumed to be consistent with
+        // `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then
         // `endpoint` should have bucket name included.
         if self.virtual_hosted_style_request {
             endpoint = self.endpoint.unwrap_or_else(|| {
@@ -940,4 +1019,18 @@ mod tests {
             err
         );
     }
+
+    #[test]
+    fn s3_test_urls() {
+        let builder = AmazonS3Builder::new().with_url("s3://bucket/path");
+        assert_eq!(builder.bucket_name, Some("bucket".to_string()));
+
+        let builder = AmazonS3Builder::new().with_url("https://s3.bucket.amazonaws.com");
+        assert_eq!(builder.bucket_name, Some("bucket".to_string()));
+
+        let builder =
+            AmazonS3Builder::new().with_url("https://bucket.s3.region.amazonaws.com");
+        assert_eq!(builder.bucket_name, Some("bucket".to_string()));
+        assert_eq!(builder.region, Some("region".to_string()))
+    }
 }
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 4b7131ea8..2cc4fe1a4 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -114,6 +114,12 @@ enum Error {
 
     #[snafu(display("Azure credential error: {}", source), context(false))]
     Credential { source: credential::Error },
+
+    #[snafu(display(
+        "Unknown url scheme cannot be parsed into storage location: {}",
+        scheme
+    ))]
+    UnknownUrlScheme { scheme: String },
 }
 
 impl From<Error> for super::Error {
@@ -361,6 +367,7 @@ pub struct MicrosoftAzureBuilder {
     use_emulator: bool,
     retry_config: RetryConfig,
     client_options: ClientOptions,
+    url_parse_error: Option<Error>,
 }
 
 impl Debug for MicrosoftAzureBuilder {
@@ -379,7 +386,7 @@ impl MicrosoftAzureBuilder {
         Default::default()
     }
 
-    /// Create an instance of [MicrosoftAzureBuilder] with values pre-populated from environment variables.
+    /// Create an instance of [`MicrosoftAzureBuilder`] with values pre-populated from environment variables.
     ///
     /// Variables extracted from environment:
     /// * AZURE_STORAGE_ACCOUNT_NAME: storage account name
@@ -424,6 +431,78 @@ impl MicrosoftAzureBuilder {
         builder
     }
 
+    /// Parse available connection info form a well-known storage URL.
+    ///
+    /// The supported url schemes are:
+    ///
+    /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
+    /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
+    /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
+    /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
+    /// - `azure://<container>/<path>` (custom)
+    /// - `https://<account>.dfs.core.windows.net`
+    /// - `https://<account>.blob.core.windows.net`
+    ///
+    /// Please note that this is a best effort implementation, and will not fail for malformed URLs,
+    /// but rather warn and ignore the passed url. The url also has no effect on how the
+    /// storage is accessed - e.g. which driver or protocol is used for reading from the location.
+    ///
+    /// # Example
+    /// ```
+    /// use object_store::azure::MicrosoftAzureBuilder;
+    ///
+    /// let azure = MicrosoftAzureBuilder::from_env()
+    ///     .with_url("abfss://file_system@account.dfs.core.windows.net/")
+    ///     .build();
+    /// ```
+    pub fn with_url(mut self, url: impl AsRef<str>) -> Self {
+        let maybe_parsed = Url::parse(url.as_ref());
+        match maybe_parsed {
+            Ok(parsed) => match parsed.scheme() {
+                "az" | "adl" | "azure" => {
+                    self.container_name = parsed.host_str().map(|host| host.to_owned());
+                }
+                "abfs" | "abfss" => {
+                    // abfs(s) might refer to the fsspec convention abfs://<container>/<path>
+                    // or the convention for the hadoop driver abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>
+                    if parsed.username().is_empty() {
+                        self.container_name =
+                            parsed.host_str().map(|host| host.to_owned());
+                    } else if let Some(host) = parsed.host_str() {
+                        let parts = host.splitn(2, '.').collect::<Vec<&str>>();
+                        if parts.len() == 2 && parts[1] == "dfs.core.windows.net" {
+                            self.container_name = Some(parsed.username().to_owned());
+                            self.account_name = Some(parts[0].to_string());
+                        }
+                    }
+                }
+                "https" => {
+                    if let Some(host) = parsed.host_str() {
+                        let parts = host.splitn(2, '.').collect::<Vec<&str>>();
+                        if parts.len() == 2
+                            && (parts[1] == "dfs.core.windows.net"
+                                || parts[1] == "blob.core.windows.net")
+                        {
+                            self.account_name = Some(parts[0].to_string());
+                        }
+                    }
+                }
+                other => {
+                    self.url_parse_error = Some(Error::UnknownUrlScheme {
+                        scheme: other.into(),
+                    });
+                }
+            },
+            Err(err) => {
+                self.url_parse_error = Some(Error::UnableToParseUrl {
+                    source: err,
+                    url: url.as_ref().into(),
+                });
+            }
+        };
+        self
+    }
+
     /// Set the Azure Account (required)
     pub fn with_account(mut self, account: impl Into<String>) -> Self {
         self.account_name = Some(account.into());
@@ -529,8 +608,13 @@ impl MicrosoftAzureBuilder {
             retry_config,
             authority_host,
             mut client_options,
+            url_parse_error,
         } = self;
 
+        if let Some(err) = url_parse_error {
+            return Err(err.into());
+        }
+
         let container = container_name.ok_or(Error::MissingContainerName {})?;
 
         let (is_emulator, storage_url, auth, account) = if use_emulator {
@@ -716,4 +800,29 @@ mod tests {
         copy_if_not_exists(&integration).await;
         stream_get(&integration).await;
     }
+
+    #[test]
+    fn azure_blob_test_urls() {
+        let builder = MicrosoftAzureBuilder::new()
+            .with_url("abfss://file_system@account.dfs.core.windows.net/");
+        assert_eq!(builder.account_name, Some("account".to_string()));
+        assert_eq!(builder.container_name, Some("file_system".to_string()));
+
+        let builder = MicrosoftAzureBuilder::new().with_url("abfs://container/path");
+        assert_eq!(builder.container_name, Some("container".to_string()));
+
+        let builder = MicrosoftAzureBuilder::new().with_url("az://container");
+        assert_eq!(builder.container_name, Some("container".to_string()));
+
+        let builder = MicrosoftAzureBuilder::new().with_url("az://container/path");
+        assert_eq!(builder.container_name, Some("container".to_string()));
+
+        let builder = MicrosoftAzureBuilder::new()
+            .with_url("https://account.dfs.core.windows.net/");
+        assert_eq!(builder.account_name, Some("account".to_string()));
+
+        let builder = MicrosoftAzureBuilder::new()
+            .with_url("https://account.blob.core.windows.net/");
+        assert_eq!(builder.account_name, Some("account".to_string()))
+    }
 }
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index f93cbde3d..b3bd57256 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -44,6 +44,7 @@ use reqwest::header::RANGE;
 use reqwest::{header, Client, Method, Response, StatusCode};
 use snafu::{ResultExt, Snafu};
 use tokio::io::AsyncWrite;
+use url::Url;
 
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
@@ -129,6 +130,18 @@ enum Error {
         source: crate::client::retry::Error,
         path: String,
     },
+
+    #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
+    UnableToParseUrl {
+        source: url::ParseError,
+        url: String,
+    },
+
+    #[snafu(display(
+        "Unknown url scheme cannot be parsed into storage location: {}",
+        scheme
+    ))]
+    UnknownUrlScheme { scheme: String },
 }
 
 impl From<Error> for super::Error {
@@ -766,6 +779,7 @@ pub struct GoogleCloudStorageBuilder {
     service_account_path: Option<String>,
     retry_config: RetryConfig,
     client_options: ClientOptions,
+    url_parse_error: Option<Error>,
 }
 
 impl Default for GoogleCloudStorageBuilder {
@@ -775,6 +789,7 @@ impl Default for GoogleCloudStorageBuilder {
             service_account_path: None,
             retry_config: Default::default(),
             client_options: ClientOptions::new().with_allow_http(true),
+            url_parse_error: None,
         }
     }
 }
@@ -785,6 +800,75 @@ impl GoogleCloudStorageBuilder {
         Default::default()
     }
 
+    /// Create an instance of [`GoogleCloudStorageBuilder`] with values pre-populated from environment variables.
+    ///
+    /// Variables extracted from environment:
+    /// * GOOGLE_SERVICE_ACCOUNT: location of service account file
+    /// * SERVICE_ACCOUNT: (alias) location of service account file
+    ///
+    /// # Example
+    /// ```
+    /// use object_store::gcp::GoogleCloudStorageBuilder;
+    ///
+    /// let azure = GoogleCloudStorageBuilder::from_env()
+    ///     .with_bucket_name("foo")
+    ///     .build();
+    /// ```
+    pub fn from_env() -> Self {
+        let mut builder = Self::default();
+
+        if let Ok(service_account_path) = std::env::var("SERVICE_ACCOUNT") {
+            builder.service_account_path = Some(service_account_path);
+        }
+
+        if let Ok(service_account_path) = std::env::var("GOOGLE_SERVICE_ACCOUNT") {
+            builder.service_account_path = Some(service_account_path);
+        }
+
+        builder
+    }
+
+    /// Parse available connection info form a well-known storage URL.
+    ///
+    /// The supported url schemes are:
+    ///
+    /// - `gs://<bucket>/<path>`
+    ///
+    /// Please note that this is a best effort implementation, and will not fail for malformed URLs,
+    /// but rather warn and ignore the passed url. The url also has no effect on how the
+    /// storage is accessed - e.g. which driver or protocol is used for reading from the location.
+    ///
+    /// # Example
+    /// ```
+    /// use object_store::gcp::GoogleCloudStorageBuilder;
+    ///
+    /// let gcs = GoogleCloudStorageBuilder::from_env()
+    ///     .with_url("gs://bucket/path")
+    ///     .build();
+    /// ```
+    pub fn with_url(mut self, url: impl AsRef<str>) -> Self {
+        let maybe_parsed = Url::parse(url.as_ref());
+        match maybe_parsed {
+            Ok(parsed) => match parsed.scheme() {
+                "gs" => {
+                    self.bucket_name = parsed.host_str().map(|host| host.to_owned());
+                }
+                other => {
+                    self.url_parse_error = Some(Error::UnknownUrlScheme {
+                        scheme: other.into(),
+                    });
+                }
+            },
+            Err(err) => {
+                self.url_parse_error = Some(Error::UnableToParseUrl {
+                    source: err,
+                    url: url.as_ref().into(),
+                });
+            }
+        };
+        self
+    }
+
     /// Set the bucket name (required)
     pub fn with_bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
         self.bucket_name = Some(bucket_name.into());
@@ -838,8 +922,13 @@ impl GoogleCloudStorageBuilder {
             service_account_path,
             retry_config,
             client_options,
+            url_parse_error,
         } = self;
 
+        if let Some(err) = url_parse_error {
+            return Err(err.into());
+        }
+
         let bucket_name = bucket_name.ok_or(Error::MissingBucketName {})?;
         let service_account_path =
             service_account_path.ok_or(Error::MissingServiceAccountPath)?;
@@ -1095,4 +1184,10 @@ mod test {
             err
         );
     }
+
+    #[test]
+    fn gcs_test_urls() {
+        let builder = GoogleCloudStorageBuilder::new().with_url("gs://bucket/path");
+        assert_eq!(builder.bucket_name, Some("bucket".to_string()))
+    }
 }