You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/13 21:04:41 UTC
[arrow-rs] branch master updated: feat: add token provider authorization to azure store (#2374)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 12a9d84f0 feat: add token provider authorization to azure store (#2374)
12a9d84f0 is described below
commit 12a9d84f0c7f8a5db20bfb7b0d7c2baf98d686aa
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Sat Aug 13 23:04:36 2022 +0200
feat: add token provider authorization to azure store (#2374)
* feat: add token provider authorizatiojn to azure store
* Apply suggestions from code review
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* feat: adpot latest APIs from altest version
* chore: clippy
* fix: lifetime issue
* chore: better errors and docs
* chore: fmt whitespace
* fix: firce first error in get method
* chore: avoid unwrapping some options
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
.gitignore | 5 +-
object_store/Cargo.toml | 17 +-
object_store/src/azure.rs | 390 +++++++++++++++++++++++++---------------------
3 files changed, 227 insertions(+), 185 deletions(-)
diff --git a/.gitignore b/.gitignore
index 2088dd5d2..5810e5add 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,4 +10,7 @@ venv/*
parquet/data.parquet
# release notes cache
.githubchangeloggenerator.cache
-.githubchangeloggenerator.cache.log
\ No newline at end of file
+.githubchangeloggenerator.cache.log
+justfile
+.prettierignore
+.env
\ No newline at end of file
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index ffb65aaa7..bb371988a 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -22,11 +22,7 @@ edition = "2021"
license = "MIT/Apache-2.0"
readme = "README.md"
description = "A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage, Azure Blob Storage and local files."
-keywords = [
- "object",
- "storage",
- "cloud",
-]
+keywords = ["object", "storage", "cloud"]
repository = "https://github.com/apache/arrow-rs"
[package.metadata.docs.rs]
@@ -35,9 +31,10 @@ all-features = true
[dependencies] # In alphabetical order
async-trait = "0.1.53"
# Microsoft Azure Blob storage integration
-azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
-azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] }
-azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+azure_core = { version = "0.4", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+azure_identity = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]}
+azure_storage = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]}
+azure_storage_blobs = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
bytes = "1.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
# Google Cloud Storage integration
@@ -70,7 +67,7 @@ url = "2.2"
walkdir = "2"
[features]
-azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
+azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest", "azure_identity"]
azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"]
@@ -78,4 +75,4 @@ aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"
tempfile = "3.1.0"
-futures-test = "0.3"
\ No newline at end of file
+futures-test = "0.3"
diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs
index 6a5f53799..9987c0370 100644
--- a/object_store/src/azure.rs
+++ b/object_store/src/azure.rs
@@ -33,22 +33,26 @@ use crate::{
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
-use azure_core::{prelude::*, HttpClient};
-use azure_storage::core::prelude::{AsStorageClient, StorageAccountClient};
-use azure_storage_blobs::blob::responses::ListBlobsResponse;
+use azure_core::{
+ error::{Error as AzureError, ErrorKind as AzureErrorKind},
+ prelude::*,
+ StatusCode,
+};
+use azure_identity::{
+ AutoRefreshingTokenCredential, ClientSecretCredential, TokenCredentialOptions,
+};
+use azure_storage::core::clients::StorageClient;
use azure_storage_blobs::blob::Blob;
-use azure_storage_blobs::{
- prelude::{AsBlobClient, AsContainerClient, ContainerClient},
- DeleteSnapshotsMethod,
+use azure_storage_blobs::container::operations::ListBlobsResponse;
+use azure_storage_blobs::prelude::{
+ AsContainerClient, ContainerClient, DeleteSnapshotsMethod,
};
use bytes::Bytes;
-use futures::{
- future::BoxFuture,
- stream::{self, BoxStream},
- StreamExt, TryStreamExt,
-};
+use chrono::{TimeZone, Utc};
+use futures::{future::BoxFuture, stream::BoxStream, StreamExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::collections::BTreeSet;
+use std::fmt::{Debug, Formatter};
use std::io;
use std::{convert::TryInto, sync::Arc};
use tokio::io::AsyncWrite;
@@ -66,7 +70,7 @@ enum Error {
source,
))]
UnableToDeleteData {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
path: String,
},
@@ -79,7 +83,7 @@ enum Error {
source,
))]
UnableToGetData {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
path: String,
},
@@ -92,7 +96,7 @@ enum Error {
source,
))]
UnableToHeadData {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
path: String,
},
@@ -105,7 +109,7 @@ enum Error {
source,
))]
UnableToGetPieceOfData {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
path: String,
},
@@ -118,7 +122,7 @@ enum Error {
source,
))]
UnableToPutData {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
path: String,
},
@@ -130,7 +134,7 @@ enum Error {
source,
))]
UnableToListData {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
},
@@ -142,7 +146,7 @@ enum Error {
source
))]
UnableToCopyFile {
- source: Box<dyn std::error::Error + Send + Sync>,
+ source: AzureError,
container: String,
from: String,
to: String,
@@ -160,12 +164,12 @@ enum Error {
NotFound {
path: String,
- source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ source: AzureError,
},
AlreadyExists {
path: String,
- source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ source: AzureError,
},
#[cfg(not(feature = "azure_test"))]
@@ -189,18 +193,24 @@ enum Error {
#[snafu(display("Account must be specified"))]
MissingAccount {},
- #[snafu(display("Access key must be specified"))]
- MissingAccessKey {},
-
#[snafu(display("Container name must be specified"))]
MissingContainerName {},
+
+ #[snafu(display("At least one authorization option must be specified"))]
+ MissingCredentials {},
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
- Error::NotFound { path, source } => Self::NotFound { path, source },
- Error::AlreadyExists { path, source } => Self::AlreadyExists { path, source },
+ Error::NotFound { path, source } => Self::NotFound {
+ path,
+ source: Box::new(source),
+ },
+ Error::AlreadyExists { path, source } => Self::AlreadyExists {
+ path,
+ source: Box::new(source),
+ },
_ => Self::Generic {
store: "Azure Blob Storage",
source: Box::new(source),
@@ -227,25 +237,15 @@ impl std::fmt::Display for MicrosoftAzure {
}
}
-#[allow(clippy::borrowed_box)]
-fn check_err_not_found(err: &Box<dyn std::error::Error + Send + Sync>) -> bool {
- if let Some(azure_core::HttpError::StatusCode { status, .. }) =
- err.downcast_ref::<azure_core::HttpError>()
- {
- return status.as_u16() == 404;
- };
- false
-}
-
#[async_trait]
impl ObjectStore for MicrosoftAzure {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let bytes = bytes::BytesMut::from(&*bytes);
self.container_client
- .as_blob_client(location.as_ref())
+ .blob_client(location.as_ref())
.put_block_blob(bytes)
- .execute()
+ .into_future()
.await
.context(UnableToPutDataSnafu {
container: &self.container_name,
@@ -277,29 +277,32 @@ impl ObjectStore for MicrosoftAzure {
}
async fn get(&self, location: &Path) -> Result<GetResult> {
- let blob = self
+ let loc = location.clone();
+ let mut stream = self
.container_client
- .as_blob_client(location.as_ref())
+ .blob_client(location.as_ref())
.get()
- .execute()
- .await
- .map_err(|err| {
- if check_err_not_found(&err) {
- return Error::NotFound {
- source: err,
- path: location.to_string(),
- };
- };
- Error::UnableToGetData {
- source: err,
- container: self.container_name.clone(),
- path: location.to_string(),
- }
- })?;
+ .into_stream()
+ .and_then(|chunk| chunk.data.collect())
+ .map_err(move |err| match err.kind() {
+ AzureErrorKind::HttpResponse {
+ status: StatusCode::NotFound,
+ ..
+ } => crate::Error::NotFound {
+ source: Box::new(err),
+ path: loc.to_string(),
+ },
+ _ => crate::Error::Generic {
+ source: Box::new(err),
+ store: "MicrosoftAzure",
+ },
+ })
+ .boxed();
- Ok(GetResult::Stream(
- futures::stream::once(async move { Ok(blob.data) }).boxed(),
- ))
+ let first = stream.next().await.transpose()?.unwrap_or_default();
+ Ok(GetResult::Stream(Box::pin(
+ futures::stream::once(async { Ok(first) }).chain(stream),
+ )))
}
async fn get_range(
@@ -307,49 +310,62 @@ impl ObjectStore for MicrosoftAzure {
location: &Path,
range: std::ops::Range<usize>,
) -> Result<Bytes> {
- let blob = self
+ let map_azure_err = |err: AzureError| match err.kind() {
+ AzureErrorKind::HttpResponse {
+ status: StatusCode::NotFound,
+ ..
+ } => Error::NotFound {
+ source: err,
+ path: location.to_string(),
+ },
+ _ => Error::UnableToGetPieceOfData {
+ source: err,
+ container: self.container_name.clone(),
+ path: location.to_string(),
+ },
+ };
+
+ let mut stream = self
.container_client
- .as_blob_client(location.as_ref())
+ .blob_client(location.as_ref())
.get()
.range(range)
- .execute()
- .await
- .map_err(|err| {
- if check_err_not_found(&err) {
- return Error::NotFound {
- source: err,
- path: location.to_string(),
- };
- };
- Error::UnableToGetPieceOfData {
- source: err,
- container: self.container_name.clone(),
- path: location.to_string(),
- }
- })?;
+ .into_stream();
+
+ let mut chunk: Vec<u8> = vec![];
+ while let Some(value) = stream.next().await {
+ let value = value
+ .map_err(map_azure_err)?
+ .data
+ .collect()
+ .await
+ .map_err(map_azure_err)?;
+ chunk.extend(&value);
+ }
- Ok(blob.data)
+ Ok(chunk.into())
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let res = self
.container_client
- .as_blob_client(location.as_ref())
+ .blob_client(location.as_ref())
.get_properties()
- .execute()
+ .into_future()
.await
- .map_err(|err| {
- if check_err_not_found(&err) {
- return Error::NotFound {
- source: err,
- path: location.to_string(),
- };
- };
- Error::UnableToHeadData {
+ .map_err(|err| match err.kind() {
+ AzureErrorKind::HttpResponse {
+ status: StatusCode::NotFound,
+ ..
+ } => Error::NotFound {
+ source: err,
+ path: location.to_string(),
+ },
+ _ => Error::UnableToHeadData {
source: err,
container: self.container_name.clone(),
path: location.to_string(),
- }
+ },
})?;
convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound {
@@ -360,10 +376,10 @@ impl ObjectStore for MicrosoftAzure {
async fn delete(&self, location: &Path) -> Result<()> {
self.container_client
- .as_blob_client(location.as_ref())
+ .blob_client(location.as_ref())
.delete()
.delete_snapshots_method(DeleteSnapshotsMethod::Include)
- .execute()
+ .into_future()
.await
.context(UnableToDeleteDataSnafu {
container: &self.container_name,
@@ -426,9 +442,9 @@ impl ObjectStore for MicrosoftAzure {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from_url = self.get_copy_from_url(from)?;
self.container_client
- .as_blob_client(to.as_ref())
- .copy(&from_url)
- .execute()
+ .blob_client(to.as_ref())
+ .copy(from_url)
+ .into_future()
.await
.context(UnableToCopyFileSnafu {
container: &self.container_name,
@@ -441,20 +457,20 @@ impl ObjectStore for MicrosoftAzure {
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let from_url = self.get_copy_from_url(from)?;
self.container_client
- .as_blob_client(to.as_ref())
- .copy(&from_url)
- .if_match_condition(IfMatchCondition::NotMatch("*".to_string()))
- .execute()
+ .blob_client(to.as_ref())
+ .copy(from_url)
+ .if_match(IfMatchCondition::NotMatch("*".to_string()))
+ .into_future()
.await
.map_err(|err| {
- if let Some(azure_core::HttpError::StatusCode { status, .. }) =
- err.downcast_ref::<azure_core::HttpError>()
+ if let AzureErrorKind::HttpResponse {
+ status: StatusCode::Conflict,
+ ..
+ } = err.kind()
{
- if status.as_u16() == 409 {
- return Error::AlreadyExists {
- source: err,
- path: to.to_string(),
- };
+ return Error::AlreadyExists {
+ source: err,
+ path: to.to_string(),
};
};
Error::UnableToCopyFile {
@@ -486,60 +502,33 @@ impl MicrosoftAzure {
prefix: Option<&Path>,
delimiter: bool,
) -> Result<BoxStream<'_, Result<ListBlobsResponse>>> {
- enum ListState {
- Start,
- HasMore(String),
- Done,
+ let mut stream = self.container_client.list_blobs();
+ if let Some(prefix_val) = format_prefix(prefix) {
+ stream = stream.prefix(prefix_val);
+ }
+ if delimiter {
+ stream = stream.delimiter(Delimiter::new(DELIMITER));
}
- let prefix_raw = format_prefix(prefix);
-
- Ok(stream::unfold(ListState::Start, move |state| {
- let mut request = self.container_client.list_blobs();
-
- if let Some(p) = prefix_raw.as_deref() {
- request = request.prefix(p);
- }
-
- if delimiter {
- request = request.delimiter(Delimiter::new(DELIMITER));
- }
-
- async move {
- match state {
- ListState::HasMore(ref marker) => {
- request = request.next_marker(marker as &str);
- }
- ListState::Done => {
- return None;
- }
- ListState::Start => {}
- }
-
- let resp = match request.execute().await.context(UnableToListDataSnafu {
- container: &self.container_name,
- }) {
- Ok(resp) => resp,
- Err(err) => return Some((Err(crate::Error::from(err)), state)),
- };
-
- let next_state = if let Some(marker) = &resp.next_marker {
- ListState::HasMore(marker.as_str().to_string())
- } else {
- ListState::Done
- };
+ let stream = stream
+ .into_stream()
+ .map(|resp| match resp {
+ Ok(list_blobs) => Ok(list_blobs),
+ Err(err) => Err(crate::Error::from(Error::UnableToListData {
+ source: err,
+ container: self.container_name.clone(),
+ })),
+ })
+ .boxed();
- Some((Ok(resp), next_state))
- }
- })
- .boxed())
+ Ok(stream)
}
}
/// Returns `None` if is a directory
fn convert_object_meta(blob: Blob) -> Result<Option<ObjectMeta>> {
let location = Path::parse(blob.name)?;
- let last_modified = blob.properties.last_modified;
+ let last_modified = Utc.timestamp(blob.properties.last_modified.unix_timestamp(), 0);
let size = blob
.properties
.content_length
@@ -580,7 +569,7 @@ fn url_from_env(env_name: &str, default_url: &str) -> Result<Url> {
Ok(url)
}
-/// Configure a connection to Mirosoft Azure Blob Storage bucket using
+/// Configure a connection to Microsoft Azure Blob Storage container using
/// the specified credentials.
///
/// # Example
@@ -595,14 +584,28 @@ fn url_from_env(env_name: &str, default_url: &str) -> Result<Url> {
/// .with_container_name(BUCKET_NAME)
/// .build();
/// ```
-#[derive(Debug, Default)]
+#[derive(Default)]
pub struct MicrosoftAzureBuilder {
account: Option<String>,
access_key: Option<String>,
container_name: Option<String>,
+ bearer_token: Option<String>,
+ client_id: Option<String>,
+ client_secret: Option<String>,
+ tenant_id: Option<String>,
use_emulator: bool,
}
+impl Debug for MicrosoftAzureBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "MicrosoftAzureBuilder {{ account: {:?}, container_name: {:?} }}",
+ self.account, self.container_name
+ )
+ }
+}
+
impl MicrosoftAzureBuilder {
/// Create a new [`MicrosoftAzureBuilder`] with default values.
pub fn new() -> Self {
@@ -615,18 +618,46 @@ impl MicrosoftAzureBuilder {
self
}
- /// Set the Azure Access Key (required)
+ /// Set the Azure Access Key (required - one of access key, bearer token, or client credentials)
pub fn with_access_key(mut self, access_key: impl Into<String>) -> Self {
self.access_key = Some(access_key.into());
self
}
+ /// Set a static bearer token to be used for authorizing requests
+ /// (required - one of access key, bearer token, or client credentials)
+ pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
+ self.bearer_token = Some(bearer_token.into());
+ self
+ }
+
/// Set the Azure Container Name (required)
pub fn with_container_name(mut self, container_name: impl Into<String>) -> Self {
self.container_name = Some(container_name.into());
self
}
+ /// Set a client id used for client secret authorization
+ /// (required - one of access key, bearer token, or client credentials)
+ pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
+ self.client_id = Some(client_id.into());
+ self
+ }
+
+ /// Set a client secret used for client secret authorization
+ /// (required - one of access key, bearer token, or client credentials)
+ pub fn with_client_secret(mut self, client_secret: impl Into<String>) -> Self {
+ self.client_secret = Some(client_secret.into());
+ self
+ }
+
+ /// Set the tenant id of the Azure AD tenant
+ /// (required - one of access key, bearer token, or client credentials)
+ pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
+ self.tenant_id = Some(tenant_id.into());
+ self
+ }
+
/// Set if the Azure emulator should be used (defaults to false)
pub fn with_use_emulator(mut self, use_emulator: bool) -> Self {
self.use_emulator = use_emulator;
@@ -640,20 +671,20 @@ impl MicrosoftAzureBuilder {
account,
access_key,
container_name,
+ bearer_token,
+ client_id,
+ client_secret,
+ tenant_id,
use_emulator,
} = self;
let account = account.ok_or(Error::MissingAccount {})?;
- let access_key = access_key.ok_or(Error::MissingAccessKey {})?;
let container_name = container_name.ok_or(Error::MissingContainerName {})?;
- let http_client: Arc<dyn HttpClient> = Arc::new(reqwest::Client::new());
-
- let (is_emulator, storage_account_client) = if use_emulator {
+ let (is_emulator, storage_client) = if use_emulator {
check_if_emulator_works()?;
// Allow overriding defaults. Values taken from
// from https://docs.rs/azure_storage/0.2.0/src/azure_storage/core/clients/storage_account_client.rs.html#129-141
- let http_client = azure_core::new_http_client();
let blob_storage_url =
url_from_env("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000")?;
let queue_storage_url =
@@ -663,8 +694,7 @@ impl MicrosoftAzureBuilder {
let filesystem_url =
url_from_env("AZURITE_TABLE_STORAGE_URL", "http://127.0.0.1:10004")?;
- let storage_client = StorageAccountClient::new_emulator(
- http_client,
+ let storage_client = StorageClient::new_emulator(
&blob_storage_url,
&table_storage_url,
&queue_storage_url,
@@ -673,25 +703,37 @@ impl MicrosoftAzureBuilder {
(true, storage_client)
} else {
- (
- false,
- StorageAccountClient::new_access_key(
- Arc::clone(&http_client),
- &account,
- &access_key,
- ),
- )
+ let client = if let Some(bearer_token) = bearer_token {
+ Ok(StorageClient::new_bearer_token(&account, bearer_token))
+ } else if let Some(access_key) = access_key {
+ Ok(StorageClient::new_access_key(&account, access_key))
+ } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) =
+ (tenant_id, client_id, client_secret)
+ {
+ let credential = Arc::new(AutoRefreshingTokenCredential::new(Arc::new(
+ ClientSecretCredential::new(
+ tenant_id,
+ client_id,
+ client_secret,
+ TokenCredentialOptions::default(),
+ ),
+ )));
+ Ok(StorageClient::new_token_credential(&account, credential))
+ } else {
+ Err(Error::MissingCredentials {})
+ }?;
+
+ (false, client)
};
- let storage_client = storage_account_client.as_storage_client();
- let blob_base_url = storage_account_client
+ let blob_base_url = storage_client
.blob_storage_url()
.as_ref()
// make url ending consistent between the emulator and remote storage account
.trim_end_matches('/')
.to_string();
- let container_client = storage_client.as_container_client(&container_name);
+ let container_client = Arc::new(storage_client.container_client(&container_name));
Ok(MicrosoftAzure {
container_client,
@@ -735,9 +777,9 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
Box::pin(async move {
client
- .as_blob_client(location.as_ref())
+ .blob_client(location.as_ref())
.put_block(block_id.clone(), buf)
- .execute()
+ .into_future()
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
@@ -761,7 +803,7 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
.map(|(part_number, maybe_part)| match maybe_part {
Some(part) => {
Ok(azure_storage_blobs::blob::BlobBlockType::Uncommitted(
- azure_storage_blobs::BlockId::new(part.content_id),
+ azure_storage_blobs::prelude::BlockId::new(part.content_id),
))
}
None => Err(io::Error::new(
@@ -779,9 +821,9 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
};
client
- .as_blob_client(location.as_ref())
- .put_block_list(&block_list)
- .execute()
+ .blob_client(location.as_ref())
+ .put_block_list(block_list)
+ .into_future()
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;