You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/13 21:04:41 UTC

[arrow-rs] branch master updated: feat: add token provider authorization to azure store (#2374)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 12a9d84f0 feat: add token provider authorization to azure store (#2374)
12a9d84f0 is described below

commit 12a9d84f0c7f8a5db20bfb7b0d7c2baf98d686aa
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Sat Aug 13 23:04:36 2022 +0200

    feat: add token provider authorization to azure store (#2374)
    
    * feat: add token provider authorizatiojn to azure store
    
    * Apply suggestions from code review
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * feat: adpot latest APIs from altest version
    
    * chore: clippy
    
    * fix: lifetime issue
    
    * chore: better errors and docs
    
    * chore: fmt whitespace
    
    * fix: firce first error in get method
    
    * chore: avoid unwrapping some options
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 .gitignore                |   5 +-
 object_store/Cargo.toml   |  17 +-
 object_store/src/azure.rs | 390 +++++++++++++++++++++++++---------------------
 3 files changed, 227 insertions(+), 185 deletions(-)

diff --git a/.gitignore b/.gitignore
index 2088dd5d2..5810e5add 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,4 +10,7 @@ venv/*
 parquet/data.parquet
 # release notes cache
 .githubchangeloggenerator.cache
-.githubchangeloggenerator.cache.log
\ No newline at end of file
+.githubchangeloggenerator.cache.log
+justfile
+.prettierignore
+.env
\ No newline at end of file
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index ffb65aaa7..bb371988a 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -22,11 +22,7 @@ edition = "2021"
 license = "MIT/Apache-2.0"
 readme = "README.md"
 description = "A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage, Azure Blob Storage and local files."
-keywords = [
-    "object",
-    "storage",
-    "cloud",
-]
+keywords = ["object", "storage", "cloud"]
 repository = "https://github.com/apache/arrow-rs"
 
 [package.metadata.docs.rs]
@@ -35,9 +31,10 @@ all-features = true
 [dependencies] # In alphabetical order
 async-trait = "0.1.53"
 # Microsoft Azure Blob storage integration
-azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
-azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] }
-azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+azure_core = { version = "0.4", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+azure_identity = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]}
+azure_storage = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]}
+azure_storage_blobs = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
 bytes = "1.0"
 chrono = { version = "0.4", default-features = false, features = ["clock"] }
 # Google Cloud Storage integration
@@ -70,7 +67,7 @@ url = "2.2"
 walkdir = "2"
 
 [features]
-azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
+azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest", "azure_identity"]
 azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
 gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"]
 aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"]
@@ -78,4 +75,4 @@ aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "
 [dev-dependencies] # In alphabetical order
 dotenv = "0.15.0"
 tempfile = "3.1.0"
-futures-test = "0.3"
\ No newline at end of file
+futures-test = "0.3"
diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs
index 6a5f53799..9987c0370 100644
--- a/object_store/src/azure.rs
+++ b/object_store/src/azure.rs
@@ -33,22 +33,26 @@ use crate::{
     GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
 };
 use async_trait::async_trait;
-use azure_core::{prelude::*, HttpClient};
-use azure_storage::core::prelude::{AsStorageClient, StorageAccountClient};
-use azure_storage_blobs::blob::responses::ListBlobsResponse;
+use azure_core::{
+    error::{Error as AzureError, ErrorKind as AzureErrorKind},
+    prelude::*,
+    StatusCode,
+};
+use azure_identity::{
+    AutoRefreshingTokenCredential, ClientSecretCredential, TokenCredentialOptions,
+};
+use azure_storage::core::clients::StorageClient;
 use azure_storage_blobs::blob::Blob;
-use azure_storage_blobs::{
-    prelude::{AsBlobClient, AsContainerClient, ContainerClient},
-    DeleteSnapshotsMethod,
+use azure_storage_blobs::container::operations::ListBlobsResponse;
+use azure_storage_blobs::prelude::{
+    AsContainerClient, ContainerClient, DeleteSnapshotsMethod,
 };
 use bytes::Bytes;
-use futures::{
-    future::BoxFuture,
-    stream::{self, BoxStream},
-    StreamExt, TryStreamExt,
-};
+use chrono::{TimeZone, Utc};
+use futures::{future::BoxFuture, stream::BoxStream, StreamExt, TryStreamExt};
 use snafu::{ResultExt, Snafu};
 use std::collections::BTreeSet;
+use std::fmt::{Debug, Formatter};
 use std::io;
 use std::{convert::TryInto, sync::Arc};
 use tokio::io::AsyncWrite;
@@ -66,7 +70,7 @@ enum Error {
         source,
     ))]
     UnableToDeleteData {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
         path: String,
     },
@@ -79,7 +83,7 @@ enum Error {
         source,
     ))]
     UnableToGetData {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
         path: String,
     },
@@ -92,7 +96,7 @@ enum Error {
         source,
     ))]
     UnableToHeadData {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
         path: String,
     },
@@ -105,7 +109,7 @@ enum Error {
         source,
     ))]
     UnableToGetPieceOfData {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
         path: String,
     },
@@ -118,7 +122,7 @@ enum Error {
         source,
     ))]
     UnableToPutData {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
         path: String,
     },
@@ -130,7 +134,7 @@ enum Error {
         source,
     ))]
     UnableToListData {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
     },
 
@@ -142,7 +146,7 @@ enum Error {
         source
     ))]
     UnableToCopyFile {
-        source: Box<dyn std::error::Error + Send + Sync>,
+        source: AzureError,
         container: String,
         from: String,
         to: String,
@@ -160,12 +164,12 @@ enum Error {
 
     NotFound {
         path: String,
-        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+        source: AzureError,
     },
 
     AlreadyExists {
         path: String,
-        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+        source: AzureError,
     },
 
     #[cfg(not(feature = "azure_test"))]
@@ -189,18 +193,24 @@ enum Error {
     #[snafu(display("Account must be specified"))]
     MissingAccount {},
 
-    #[snafu(display("Access key must be specified"))]
-    MissingAccessKey {},
-
     #[snafu(display("Container name must be specified"))]
     MissingContainerName {},
+
+    #[snafu(display("At least one authorization option must be specified"))]
+    MissingCredentials {},
 }
 
 impl From<Error> for super::Error {
     fn from(source: Error) -> Self {
         match source {
-            Error::NotFound { path, source } => Self::NotFound { path, source },
-            Error::AlreadyExists { path, source } => Self::AlreadyExists { path, source },
+            Error::NotFound { path, source } => Self::NotFound {
+                path,
+                source: Box::new(source),
+            },
+            Error::AlreadyExists { path, source } => Self::AlreadyExists {
+                path,
+                source: Box::new(source),
+            },
             _ => Self::Generic {
                 store: "Azure Blob Storage",
                 source: Box::new(source),
@@ -227,25 +237,15 @@ impl std::fmt::Display for MicrosoftAzure {
     }
 }
 
-#[allow(clippy::borrowed_box)]
-fn check_err_not_found(err: &Box<dyn std::error::Error + Send + Sync>) -> bool {
-    if let Some(azure_core::HttpError::StatusCode { status, .. }) =
-        err.downcast_ref::<azure_core::HttpError>()
-    {
-        return status.as_u16() == 404;
-    };
-    false
-}
-
 #[async_trait]
 impl ObjectStore for MicrosoftAzure {
     async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
         let bytes = bytes::BytesMut::from(&*bytes);
 
         self.container_client
-            .as_blob_client(location.as_ref())
+            .blob_client(location.as_ref())
             .put_block_blob(bytes)
-            .execute()
+            .into_future()
             .await
             .context(UnableToPutDataSnafu {
                 container: &self.container_name,
@@ -277,29 +277,32 @@ impl ObjectStore for MicrosoftAzure {
     }
 
     async fn get(&self, location: &Path) -> Result<GetResult> {
-        let blob = self
+        let loc = location.clone();
+        let mut stream = self
             .container_client
-            .as_blob_client(location.as_ref())
+            .blob_client(location.as_ref())
             .get()
-            .execute()
-            .await
-            .map_err(|err| {
-                if check_err_not_found(&err) {
-                    return Error::NotFound {
-                        source: err,
-                        path: location.to_string(),
-                    };
-                };
-                Error::UnableToGetData {
-                    source: err,
-                    container: self.container_name.clone(),
-                    path: location.to_string(),
-                }
-            })?;
+            .into_stream()
+            .and_then(|chunk| chunk.data.collect())
+            .map_err(move |err| match err.kind() {
+                AzureErrorKind::HttpResponse {
+                    status: StatusCode::NotFound,
+                    ..
+                } => crate::Error::NotFound {
+                    source: Box::new(err),
+                    path: loc.to_string(),
+                },
+                _ => crate::Error::Generic {
+                    source: Box::new(err),
+                    store: "MicrosoftAzure",
+                },
+            })
+            .boxed();
 
-        Ok(GetResult::Stream(
-            futures::stream::once(async move { Ok(blob.data) }).boxed(),
-        ))
+        let first = stream.next().await.transpose()?.unwrap_or_default();
+        Ok(GetResult::Stream(Box::pin(
+            futures::stream::once(async { Ok(first) }).chain(stream),
+        )))
     }
 
     async fn get_range(
@@ -307,49 +310,62 @@ impl ObjectStore for MicrosoftAzure {
         location: &Path,
         range: std::ops::Range<usize>,
     ) -> Result<Bytes> {
-        let blob = self
+        let map_azure_err = |err: AzureError| match err.kind() {
+            AzureErrorKind::HttpResponse {
+                status: StatusCode::NotFound,
+                ..
+            } => Error::NotFound {
+                source: err,
+                path: location.to_string(),
+            },
+            _ => Error::UnableToGetPieceOfData {
+                source: err,
+                container: self.container_name.clone(),
+                path: location.to_string(),
+            },
+        };
+
+        let mut stream = self
             .container_client
-            .as_blob_client(location.as_ref())
+            .blob_client(location.as_ref())
             .get()
             .range(range)
-            .execute()
-            .await
-            .map_err(|err| {
-                if check_err_not_found(&err) {
-                    return Error::NotFound {
-                        source: err,
-                        path: location.to_string(),
-                    };
-                };
-                Error::UnableToGetPieceOfData {
-                    source: err,
-                    container: self.container_name.clone(),
-                    path: location.to_string(),
-                }
-            })?;
+            .into_stream();
+
+        let mut chunk: Vec<u8> = vec![];
+        while let Some(value) = stream.next().await {
+            let value = value
+                .map_err(map_azure_err)?
+                .data
+                .collect()
+                .await
+                .map_err(map_azure_err)?;
+            chunk.extend(&value);
+        }
 
-        Ok(blob.data)
+        Ok(chunk.into())
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let res = self
             .container_client
-            .as_blob_client(location.as_ref())
+            .blob_client(location.as_ref())
             .get_properties()
-            .execute()
+            .into_future()
             .await
-            .map_err(|err| {
-                if check_err_not_found(&err) {
-                    return Error::NotFound {
-                        source: err,
-                        path: location.to_string(),
-                    };
-                };
-                Error::UnableToHeadData {
+            .map_err(|err| match err.kind() {
+                AzureErrorKind::HttpResponse {
+                    status: StatusCode::NotFound,
+                    ..
+                } => Error::NotFound {
+                    source: err,
+                    path: location.to_string(),
+                },
+                _ => Error::UnableToHeadData {
                     source: err,
                     container: self.container_name.clone(),
                     path: location.to_string(),
-                }
+                },
             })?;
 
         convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound {
@@ -360,10 +376,10 @@ impl ObjectStore for MicrosoftAzure {
 
     async fn delete(&self, location: &Path) -> Result<()> {
         self.container_client
-            .as_blob_client(location.as_ref())
+            .blob_client(location.as_ref())
             .delete()
             .delete_snapshots_method(DeleteSnapshotsMethod::Include)
-            .execute()
+            .into_future()
             .await
             .context(UnableToDeleteDataSnafu {
                 container: &self.container_name,
@@ -426,9 +442,9 @@ impl ObjectStore for MicrosoftAzure {
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
         let from_url = self.get_copy_from_url(from)?;
         self.container_client
-            .as_blob_client(to.as_ref())
-            .copy(&from_url)
-            .execute()
+            .blob_client(to.as_ref())
+            .copy(from_url)
+            .into_future()
             .await
             .context(UnableToCopyFileSnafu {
                 container: &self.container_name,
@@ -441,20 +457,20 @@ impl ObjectStore for MicrosoftAzure {
     async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
         let from_url = self.get_copy_from_url(from)?;
         self.container_client
-            .as_blob_client(to.as_ref())
-            .copy(&from_url)
-            .if_match_condition(IfMatchCondition::NotMatch("*".to_string()))
-            .execute()
+            .blob_client(to.as_ref())
+            .copy(from_url)
+            .if_match(IfMatchCondition::NotMatch("*".to_string()))
+            .into_future()
             .await
             .map_err(|err| {
-                if let Some(azure_core::HttpError::StatusCode { status, .. }) =
-                    err.downcast_ref::<azure_core::HttpError>()
+                if let AzureErrorKind::HttpResponse {
+                    status: StatusCode::Conflict,
+                    ..
+                } = err.kind()
                 {
-                    if status.as_u16() == 409 {
-                        return Error::AlreadyExists {
-                            source: err,
-                            path: to.to_string(),
-                        };
+                    return Error::AlreadyExists {
+                        source: err,
+                        path: to.to_string(),
                     };
                 };
                 Error::UnableToCopyFile {
@@ -486,60 +502,33 @@ impl MicrosoftAzure {
         prefix: Option<&Path>,
         delimiter: bool,
     ) -> Result<BoxStream<'_, Result<ListBlobsResponse>>> {
-        enum ListState {
-            Start,
-            HasMore(String),
-            Done,
+        let mut stream = self.container_client.list_blobs();
+        if let Some(prefix_val) = format_prefix(prefix) {
+            stream = stream.prefix(prefix_val);
+        }
+        if delimiter {
+            stream = stream.delimiter(Delimiter::new(DELIMITER));
         }
 
-        let prefix_raw = format_prefix(prefix);
-
-        Ok(stream::unfold(ListState::Start, move |state| {
-            let mut request = self.container_client.list_blobs();
-
-            if let Some(p) = prefix_raw.as_deref() {
-                request = request.prefix(p);
-            }
-
-            if delimiter {
-                request = request.delimiter(Delimiter::new(DELIMITER));
-            }
-
-            async move {
-                match state {
-                    ListState::HasMore(ref marker) => {
-                        request = request.next_marker(marker as &str);
-                    }
-                    ListState::Done => {
-                        return None;
-                    }
-                    ListState::Start => {}
-                }
-
-                let resp = match request.execute().await.context(UnableToListDataSnafu {
-                    container: &self.container_name,
-                }) {
-                    Ok(resp) => resp,
-                    Err(err) => return Some((Err(crate::Error::from(err)), state)),
-                };
-
-                let next_state = if let Some(marker) = &resp.next_marker {
-                    ListState::HasMore(marker.as_str().to_string())
-                } else {
-                    ListState::Done
-                };
+        let stream = stream
+            .into_stream()
+            .map(|resp| match resp {
+                Ok(list_blobs) => Ok(list_blobs),
+                Err(err) => Err(crate::Error::from(Error::UnableToListData {
+                    source: err,
+                    container: self.container_name.clone(),
+                })),
+            })
+            .boxed();
 
-                Some((Ok(resp), next_state))
-            }
-        })
-        .boxed())
+        Ok(stream)
     }
 }
 
 /// Returns `None` if is a directory
 fn convert_object_meta(blob: Blob) -> Result<Option<ObjectMeta>> {
     let location = Path::parse(blob.name)?;
-    let last_modified = blob.properties.last_modified;
+    let last_modified = Utc.timestamp(blob.properties.last_modified.unix_timestamp(), 0);
     let size = blob
         .properties
         .content_length
@@ -580,7 +569,7 @@ fn url_from_env(env_name: &str, default_url: &str) -> Result<Url> {
     Ok(url)
 }
 
-/// Configure a connection to Mirosoft Azure Blob Storage bucket using
+/// Configure a connection to Microsoft Azure Blob Storage container using
 /// the specified credentials.
 ///
 /// # Example
@@ -595,14 +584,28 @@ fn url_from_env(env_name: &str, default_url: &str) -> Result<Url> {
 ///  .with_container_name(BUCKET_NAME)
 ///  .build();
 /// ```
-#[derive(Debug, Default)]
+#[derive(Default)]
 pub struct MicrosoftAzureBuilder {
     account: Option<String>,
     access_key: Option<String>,
     container_name: Option<String>,
+    bearer_token: Option<String>,
+    client_id: Option<String>,
+    client_secret: Option<String>,
+    tenant_id: Option<String>,
     use_emulator: bool,
 }
 
+impl Debug for MicrosoftAzureBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "MicrosoftAzureBuilder {{ account: {:?}, container_name: {:?} }}",
+            self.account, self.container_name
+        )
+    }
+}
+
 impl MicrosoftAzureBuilder {
     /// Create a new [`MicrosoftAzureBuilder`] with default values.
     pub fn new() -> Self {
@@ -615,18 +618,46 @@ impl MicrosoftAzureBuilder {
         self
     }
 
-    /// Set the Azure Access Key (required)
+    /// Set the Azure Access Key (required - one of access key, bearer token, or client credentials)
     pub fn with_access_key(mut self, access_key: impl Into<String>) -> Self {
         self.access_key = Some(access_key.into());
         self
     }
 
+    /// Set a static bearer token to be used for authorizing requests
+    /// (required - one of access key, bearer token, or client credentials)
+    pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
+        self.bearer_token = Some(bearer_token.into());
+        self
+    }
+
     /// Set the Azure Container Name (required)
     pub fn with_container_name(mut self, container_name: impl Into<String>) -> Self {
         self.container_name = Some(container_name.into());
         self
     }
 
+    /// Set a client id used for client secret authorization
+    /// (required - one of access key, bearer token, or client credentials)
+    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
+        self.client_id = Some(client_id.into());
+        self
+    }
+
+    /// Set a client secret used for client secret authorization
+    /// (required - one of access key, bearer token, or client credentials)
+    pub fn with_client_secret(mut self, client_secret: impl Into<String>) -> Self {
+        self.client_secret = Some(client_secret.into());
+        self
+    }
+
+    /// Set the tenant id of the Azure AD tenant
+    /// (required - one of access key, bearer token, or client credentials)
+    pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
+        self.tenant_id = Some(tenant_id.into());
+        self
+    }
+
     /// Set if the Azure emulator should be used (defaults to false)
     pub fn with_use_emulator(mut self, use_emulator: bool) -> Self {
         self.use_emulator = use_emulator;
@@ -640,20 +671,20 @@ impl MicrosoftAzureBuilder {
             account,
             access_key,
             container_name,
+            bearer_token,
+            client_id,
+            client_secret,
+            tenant_id,
             use_emulator,
         } = self;
 
         let account = account.ok_or(Error::MissingAccount {})?;
-        let access_key = access_key.ok_or(Error::MissingAccessKey {})?;
         let container_name = container_name.ok_or(Error::MissingContainerName {})?;
 
-        let http_client: Arc<dyn HttpClient> = Arc::new(reqwest::Client::new());
-
-        let (is_emulator, storage_account_client) = if use_emulator {
+        let (is_emulator, storage_client) = if use_emulator {
             check_if_emulator_works()?;
             // Allow overriding defaults. Values taken from
             // from https://docs.rs/azure_storage/0.2.0/src/azure_storage/core/clients/storage_account_client.rs.html#129-141
-            let http_client = azure_core::new_http_client();
             let blob_storage_url =
                 url_from_env("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000")?;
             let queue_storage_url =
@@ -663,8 +694,7 @@ impl MicrosoftAzureBuilder {
             let filesystem_url =
                 url_from_env("AZURITE_TABLE_STORAGE_URL", "http://127.0.0.1:10004")?;
 
-            let storage_client = StorageAccountClient::new_emulator(
-                http_client,
+            let storage_client = StorageClient::new_emulator(
                 &blob_storage_url,
                 &table_storage_url,
                 &queue_storage_url,
@@ -673,25 +703,37 @@ impl MicrosoftAzureBuilder {
 
             (true, storage_client)
         } else {
-            (
-                false,
-                StorageAccountClient::new_access_key(
-                    Arc::clone(&http_client),
-                    &account,
-                    &access_key,
-                ),
-            )
+            let client = if let Some(bearer_token) = bearer_token {
+                Ok(StorageClient::new_bearer_token(&account, bearer_token))
+            } else if let Some(access_key) = access_key {
+                Ok(StorageClient::new_access_key(&account, access_key))
+            } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) =
+                (tenant_id, client_id, client_secret)
+            {
+                let credential = Arc::new(AutoRefreshingTokenCredential::new(Arc::new(
+                    ClientSecretCredential::new(
+                        tenant_id,
+                        client_id,
+                        client_secret,
+                        TokenCredentialOptions::default(),
+                    ),
+                )));
+                Ok(StorageClient::new_token_credential(&account, credential))
+            } else {
+                Err(Error::MissingCredentials {})
+            }?;
+
+            (false, client)
         };
 
-        let storage_client = storage_account_client.as_storage_client();
-        let blob_base_url = storage_account_client
+        let blob_base_url = storage_client
             .blob_storage_url()
             .as_ref()
             // make url ending consistent between the emulator and remote storage account
             .trim_end_matches('/')
             .to_string();
 
-        let container_client = storage_client.as_container_client(&container_name);
+        let container_client = Arc::new(storage_client.container_client(&container_name));
 
         Ok(MicrosoftAzure {
             container_client,
@@ -735,9 +777,9 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
 
         Box::pin(async move {
             client
-                .as_blob_client(location.as_ref())
+                .blob_client(location.as_ref())
                 .put_block(block_id.clone(), buf)
-                .execute()
+                .into_future()
                 .await
                 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
 
@@ -761,7 +803,7 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
                 .map(|(part_number, maybe_part)| match maybe_part {
                     Some(part) => {
                         Ok(azure_storage_blobs::blob::BlobBlockType::Uncommitted(
-                            azure_storage_blobs::BlockId::new(part.content_id),
+                            azure_storage_blobs::prelude::BlockId::new(part.content_id),
                         ))
                     }
                     None => Err(io::Error::new(
@@ -779,9 +821,9 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
             };
 
             client
-                .as_blob_client(location.as_ref())
-                .put_block_list(&block_list)
-                .execute()
+                .blob_client(location.as_ref())
+                .put_block_list(block_list)
+                .into_future()
                 .await
                 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;