You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/21 07:57:32 UTC

[GitHub] [arrow-rs] roeap commented on a diff in pull request #2509: Replace azure sdk with custom implementation

roeap commented on code in PR #2509:
URL: https://github.com/apache/arrow-rs/pull/2509#discussion_r950803790


##########
object_store/src/azure/client.rs:
##########
@@ -0,0 +1,740 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::credential::{AzureCredential, CredentialProvider};
+use crate::azure::credential::*;
+use crate::client::pagination::stream_paginated;
+use crate::client::retry::RetryExt;
+use crate::path::DELIMITER;
+use crate::util::{encode_path, format_http_range, format_prefix};
+use crate::{BoxStream, ListResult, ObjectMeta, Path, Result, RetryConfig, StreamExt};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, TimeZone, Utc};
+use itertools::Itertools;
+use reqwest::{
+    header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
+    Client as ReqwestClient, Method, Response, StatusCode,
+};
+use serde::{Deserialize, Deserializer, Serialize};
+use snafu::{ResultExt, Snafu};
+use std::collections::HashMap;
+use std::ops::Range;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub(crate) enum Error {
+    #[snafu(display("Error performing get request {}: {}", path, source))]
+    GetRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing put request {}: {}", path, source))]
+    PutRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing delete request {}: {}", path, source))]
+    DeleteRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing copy request {}: {}", path, source))]
+    CopyRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing list request: {}", source))]
+    ListRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing create multipart request: {}", source))]
+    CreateMultipartRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing complete multipart request: {}", source))]
+    CompleteMultipartRequest { source: reqwest::Error },
+
+    #[snafu(display("Got invalid list response: {}", source))]
+    InvalidListResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Got invalid multipart response: {}", source))]
+    InvalidMultipartResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Error authorizing request: {}", source))]
+    Authorization { source: crate::client::oauth::Error },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        match err {
+            Error::GetRequest { source, path }
+            | Error::DeleteRequest { source, path }
+            | Error::CopyRequest { source, path }
+            | Error::PutRequest { source, path }
+                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
+            {
+                Self::NotFound {
+                    path,
+                    source: Box::new(source),
+                }
+            }
+            Error::CopyRequest { source, path }
+                if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
+            {
+                Self::AlreadyExists {
+                    path,
+                    source: Box::new(source),
+                }
+            }
+            _ => Self::Generic {
+                store: "MicrosoftAzure",
+                source: Box::new(err),
+            },
+        }
+    }
+}
+
+/// Configuration for [AzureClient]
+#[derive(Debug)]
+pub struct AzureConfig {
+    pub account: String,
+    pub container: String,
+    pub credentials: CredentialProvider,
+    pub retry_config: RetryConfig,
+    pub allow_http: bool,
+    pub service: String,
+    pub is_emulator: bool,
+}
+
+impl AzureConfig {
+    fn path_url(&self, path: &Path) -> String {
+        if self.is_emulator {
+            format!(
+                "{}/{}/{}/{}",
+                self.service,
+                self.account,
+                self.container,
+                encode_path(path)
+            )
+        } else {
+            format!("{}/{}/{}", self.service, self.container, encode_path(path))
+        }
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct AzureClient {
+    config: AzureConfig,
+    client: ReqwestClient,
+}
+
+impl AzureClient {
+    /// create a new instance of [AzureClient]
+    pub fn new(config: AzureConfig) -> Self {
+        let client = reqwest::ClientBuilder::new()
+            .https_only(!config.allow_http)
+            .build()
+            .unwrap();
+
+        Self { config, client }
+    }
+
+    /// Returns the config
+    pub fn config(&self) -> &AzureConfig {
+        &self.config
+    }
+
+    async fn get_credential(&self) -> Result<AzureCredential> {
+        match &self.config.credentials {
+            CredentialProvider::AccessKey(key) => {
+                Ok(AzureCredential::AccessKey(key.to_owned()))
+            }
+            CredentialProvider::ClientSecret(cred) => {
+                let token = cred
+                    .fetch_token(&self.client, &self.config.retry_config)
+                    .await
+                    .context(AuthorizationSnafu)?;
+                Ok(AzureCredential::AuthorizationToken(
+                    // we do the conversion to a HeaderValue here, since it is fallible
+                    // and we wna to use it in an infallible function
+                    HeaderValue::from_str(&format!("Bearer {}", token)).map_err(
+                        |err| crate::Error::Generic {
+                            store: "MicrosoftAzure",
+                            source: Box::new(err),
+                        },
+                    )?,
+                ))
+            }
+            CredentialProvider::SASToken(sas) => {
+                Ok(AzureCredential::SASToken(sas.clone()))
+            }
+        }
+    }
+
+    /// Make an Azure PUT request <https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>
+    pub async fn put_request<T: Serialize + ?Sized + Sync>(
+        &self,
+        path: &Path,
+        bytes: Option<Bytes>,
+        is_block_op: bool,
+        query: &T,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+
+        let mut builder = self.client.request(Method::PUT, url);
+
+        if !is_block_op {
+            builder = builder.header(&BLOB_TYPE, "BlockBlob");
+        }
+
+        if let Some(bytes) = bytes {
+            builder = builder
+                .header(CONTENT_LENGTH, HeaderValue::from(bytes.len()))
+                .body(bytes)
+        }
+
+        let response = builder
+            .query(query)
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(PutRequestSnafu {
+                path: path.as_ref(),
+            })?
+            .error_for_status()
+            .context(PutRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+
+    /// Make an Azure GET request
+    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
+    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
+    pub async fn get_request(
+        &self,
+        path: &Path,
+        range: Option<Range<usize>>,
+        head: bool,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
+        };
+
+        let mut builder = self
+            .client
+            .request(method, url)
+            .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
+            .body(Bytes::new());
+
+        if let Some(range) = range {
+            // Note: Azurite emulator does not support crc64 headers
+            if !self.config.is_emulator && range.end - range.start < 1024 * 1024 * 4 {
+                builder = builder
+                    .header(&RANGE_GET_CONTENT_CRC64, HeaderValue::from_static("true"));

Review Comment:
   very true :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org