You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/27 09:31:39 UTC

[arrow-rs] branch master updated: feat: support bulk deletes in object_store (#4060)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 770e241ce feat: support bulk deletes in object_store (#4060)
770e241ce is described below

commit 770e241ceac89d693dd7577c72266f6dad48c9e2
Author: Will Jones <wi...@gmail.com>
AuthorDate: Sat May 27 02:31:34 2023 -0700

    feat: support bulk deletes in object_store (#4060)
    
    * feat: support bulk deletes
    
    * fix: make NotFound reporting consistent
    
    * fix http store
    
    * fix aws support
    
    * remove unnecessary flag
    
    * fix: make AWS S3 compatible
    
    * pr feedback: use simpler API
    
    * pr feedback: test paths and ordering
    
    * Update object_store/src/limit.rs
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * take fallible stream
    
    * final pr feedback
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 object_store/CONTRIBUTING.md    |  14 +++-
 object_store/Cargo.toml         |   2 +-
 object_store/src/aws/client.rs  | 175 ++++++++++++++++++++++++++++++++++++++++
 object_store/src/aws/mod.rs     |  21 +++++
 object_store/src/http/client.rs |   8 +-
 object_store/src/lib.rs         | 123 ++++++++++++++++++++++++++--
 object_store/src/limit.rs       |   7 ++
 object_store/src/local.rs       |   9 ++-
 8 files changed, 345 insertions(+), 14 deletions(-)

diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
index 47c294022..aeb38e13a 100644
--- a/object_store/CONTRIBUTING.md
+++ b/object_store/CONTRIBUTING.md
@@ -39,7 +39,8 @@ To test the S3 integration against [localstack](https://localstack.cloud/)
 First start up a container running localstack
 
 ```
-$ podman run --rm -it -e PROVIDER_OVERRIDE_S3=asf -p 4566:4566 -p 4510-4559:4510-4559 localstack/localstack 
+$ podman run -d -p 4566:4566 localstack/localstack:2.0
+$ podman run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
 ```
 
 Setup environment
@@ -87,13 +88,18 @@ $ podman run -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azur
 Create a bucket
 
 ```
-$ podman run --net=host mcr.microsoft.com/azure-cli az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://128.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://128.0.0.1:10001/devstoreaccount1;'
+$ podman run --net=host mcr.microsoft.com/azure-cli az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
 ```
 
 Run tests
 
-```
-$ cargo test --features azure
+```shell
+AZURE_USE_EMULATOR=1 \
+TEST_INTEGRATION=1 \
+OBJECT_STORE_BUCKET=test-bucket \
+AZURE_STORAGE_ACCOUNT=devstoreaccount1 \
+AZURE_STORAGE_ACCESS_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \
+cargo test --features azure
 ```
 
 ### GCP
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 1fb988642..28bf29f7f 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -45,7 +45,7 @@ walkdir = "2"
 # Cloud storage support
 base64 = { version = "0.21", default-features = false, features = ["std"], optional = true }
 hyper = { version = "0.14", default-features = false, optional = true }
-quick-xml = { version = "0.28.0", features = ["serialize"], optional = true }
+quick-xml = { version = "0.28.0", features = ["serialize", "overlapped-lists"], optional = true }
 serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
 serde_json = { version = "1.0", default-features = false, optional = true }
 rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index cfce35254..0c2493651 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -32,7 +32,9 @@ use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
+use itertools::Itertools;
 use percent_encoding::{utf8_percent_encode, PercentEncode};
+use quick_xml::events::{self as xml_events};
 use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
@@ -66,6 +68,29 @@ pub(crate) enum Error {
         path: String,
     },
 
+    #[snafu(display("Error performing DeleteObjects request: {}", source))]
+    DeleteObjectsRequest { source: crate::client::retry::Error },
+
+    #[snafu(display(
+        "DeleteObjects request failed for key {}: {} (code: {})",
+        path,
+        message,
+        code
+    ))]
+    DeleteFailed {
+        path: String,
+        code: String,
+        message: String,
+    },
+
+    #[snafu(display("Error getting DeleteObjects response body: {}", source))]
+    DeleteObjectsResponse { source: reqwest::Error },
+
+    #[snafu(display("Got invalid DeleteObjects response: {}", source))]
+    InvalidDeleteObjectsResponse {
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
     #[snafu(display("Error performing copy request {}: {}", path, source))]
     CopyRequest {
         source: crate::client::retry::Error,
@@ -129,6 +154,44 @@ struct MultipartPart {
     part_number: usize,
 }
 
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
+struct BatchDeleteResponse {
+    #[serde(rename = "$value")]
+    content: Vec<DeleteObjectResult>,
+}
+
+#[derive(Deserialize)]
+enum DeleteObjectResult {
+    Deleted(DeletedObject),
+    Error(DeleteError),
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "Deleted")]
+struct DeletedObject {
+    #[allow(dead_code)]
+    key: String,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "Error")]
+struct DeleteError {
+    key: String,
+    code: String,
+    message: String,
+}
+
+impl From<DeleteError> for Error {
+    fn from(err: DeleteError) -> Self {
+        Self::DeleteFailed {
+            path: err.key,
+            code: err.code,
+            message: err.message,
+        }
+    }
+}
+
 #[derive(Debug)]
 pub struct S3Config {
     pub region: String,
@@ -243,6 +306,118 @@ impl S3Client {
         Ok(())
     }
 
+    /// Make an S3 Delete Objects request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
+    ///
+    /// Produces a vector of results, one for each path in the input vector. If
+    /// the delete was successful, the path is returned in the `Ok` variant. If
+    /// there was an error for a certain path, the error will be returned in the
+    /// vector. If there was an issue with making the overall request, an error
+    /// will be returned at the top level.
+    pub async fn bulk_delete_request(
+        &self,
+        paths: Vec<Path>,
+    ) -> Result<Vec<Result<Path>>> {
+        if paths.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let credential = self.get_credential().await?;
+        let url = format!("{}?delete", self.config.bucket_endpoint);
+
+        let mut buffer = Vec::new();
+        let mut writer = quick_xml::Writer::new(&mut buffer);
+        writer
+            .write_event(xml_events::Event::Start(
+                xml_events::BytesStart::new("Delete").with_attributes([(
+                    "xmlns",
+                    "http://s3.amazonaws.com/doc/2006-03-01/",
+                )]),
+            ))
+            .unwrap();
+        for path in &paths {
+            // <Object><Key>{path}</Key></Object>
+            writer
+                .write_event(xml_events::Event::Start(xml_events::BytesStart::new(
+                    "Object",
+                )))
+                .unwrap();
+            writer
+                .write_event(xml_events::Event::Start(xml_events::BytesStart::new("Key")))
+                .unwrap();
+            writer
+                .write_event(xml_events::Event::Text(xml_events::BytesText::new(
+                    path.as_ref(),
+                )))
+                .map_err(|err| crate::Error::Generic {
+                    store: STORE,
+                    source: Box::new(err),
+                })?;
+            writer
+                .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Key")))
+                .unwrap();
+            writer
+                .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Object")))
+                .unwrap();
+        }
+        writer
+            .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Delete")))
+            .unwrap();
+
+        let body = Bytes::from(buffer);
+
+        let mut builder = self.client.request(Method::POST, url);
+
+        // Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to
+        // their algorithm if the user hasn't specified one.
+        let checksum = self.config().checksum.unwrap_or(Checksum::SHA256);
+        let digest = checksum.digest(&body);
+        builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
+        let payload_sha256 = if checksum == Checksum::SHA256 {
+            Some(digest)
+        } else {
+            None
+        };
+
+        let response = builder
+            .header(CONTENT_TYPE, "application/xml")
+            .body(body)
+            .with_aws_sigv4(
+                credential.as_ref(),
+                &self.config.region,
+                "s3",
+                self.config.sign_payload,
+                payload_sha256.as_deref(),
+            )
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(DeleteObjectsRequestSnafu {})?
+            .bytes()
+            .await
+            .context(DeleteObjectsResponseSnafu {})?;
+
+        let response: BatchDeleteResponse = quick_xml::de::from_reader(response.reader())
+            .map_err(|err| Error::InvalidDeleteObjectsResponse {
+                source: Box::new(err),
+            })?;
+
+        // Assume all were ok, then fill in errors. This guarantees output order
+        // matches input order.
+        let mut results: Vec<Result<Path>> = paths.iter().cloned().map(Ok).collect();
+        for content in response.content.into_iter() {
+            if let DeleteObjectResult::Error(error) = content {
+                let path = Path::parse(&error.key).map_err(|err| {
+                    Error::InvalidDeleteObjectsResponse {
+                        source: Box::new(err),
+                    }
+                })?;
+                let i = paths.iter().find_position(|&p| p == &path).unwrap().0;
+                results[i] = Err(Error::from(error).into());
+            }
+        }
+
+        Ok(results)
+    }
+
     /// Make an S3 Copy request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
     pub async fn copy_request(&self, from: &Path, to: &Path) -> Result<()> {
         let credential = self.get_credential().await?;
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index fac6165b5..3696e4ad4 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -34,6 +34,7 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use serde::{Deserialize, Serialize};
 use snafu::{ensure, OptionExt, ResultExt, Snafu};
@@ -252,6 +253,26 @@ impl ObjectStore for AmazonS3 {
         self.client.delete_request(location, &()).await
     }
 
+    fn delete_stream<'a>(
+        &'a self,
+        locations: BoxStream<'a, Result<Path>>,
+    ) -> BoxStream<'a, Result<Path>> {
+        locations
+            .try_chunks(1_000)
+            .map(move |locations| async {
+                // Early return the error. We ignore the paths that have already been
+                // collected into the chunk.
+                let locations = locations.map_err(|e| e.1)?;
+                self.client
+                    .bulk_delete_request(locations)
+                    .await
+                    .map(futures::stream::iter)
+            })
+            .buffered(20)
+            .try_flatten()
+            .boxed()
+    }
+
     async fn list(
         &self,
         prefix: Option<&Path>,
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 6feacbba6..1d3df34db 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -225,7 +225,13 @@ impl Client {
             .delete(url)
             .send_retry(&self.retry_config)
             .await
-            .context(RequestSnafu)?;
+            .map_err(|source| match source.status() {
+                Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+                    source: Box::new(source),
+                    path: path.to_string(),
+                },
+                _ => Error::Request { source }.into(),
+            })?;
         Ok(())
     }
 
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 7116a8732..c5bf40cc4 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -386,6 +386,63 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
     /// Delete the object at the specified location.
     async fn delete(&self, location: &Path) -> Result<()>;
 
+    /// Delete all the objects at the specified locations
+    ///
+    /// When supported, this method will use bulk operations that delete more
+    /// than one object per a request. The default implementation will call
+    /// the single object delete method for each location, but with up to 10
+    /// concurrent requests.
+    ///
+    /// The returned stream yields the results of the delete operations in the
+    /// same order as the input locations. However, some errors will be from
+    /// an overall call to a bulk delete operation, and not from a specific
+    /// location.
+    ///
+    /// If the object did not exist, the result may be an error or a success,
+    /// depending on the behavior of the underlying store. For example, local
+    /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
+    /// return Ok. If it is an error, it will be [`Error::NotFound`].
+    ///
+    /// ```
+    /// # use object_store::local::LocalFileSystem;
+    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+    /// # let root = tempfile::TempDir::new().unwrap();
+    /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+    /// use object_store::{ObjectStore, ObjectMeta};
+    /// use object_store::path::Path;
+    /// use futures::{StreamExt, TryStreamExt};
+    /// use bytes::Bytes;
+    ///
+    /// // Create two objects
+    /// store.put(&Path::from("foo"), Bytes::from("foo")).await?;
+    /// store.put(&Path::from("bar"), Bytes::from("bar")).await?;
+    ///
+    /// // List object
+    /// let locations = store.list(None).await?
+    ///   .map(|meta: Result<ObjectMeta, _>| meta.map(|m| m.location))
+    ///   .boxed();
+    ///
+    /// // Delete them
+    /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
+    /// # Ok(())
+    /// # }
+    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
+    /// # rt.block_on(example()).unwrap();
+    /// ```
+    fn delete_stream<'a>(
+        &'a self,
+        locations: BoxStream<'a, Result<Path>>,
+    ) -> BoxStream<'a, Result<Path>> {
+        locations
+            .map(|location| async {
+                let location = location?;
+                self.delete(&location).await?;
+                Ok(location)
+            })
+            .buffered(10)
+            .boxed()
+    }
+
     /// List all the objects with the given prefix.
     ///
     /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
@@ -515,6 +572,13 @@ impl ObjectStore for Box<dyn ObjectStore> {
         self.as_ref().delete(location).await
     }
 
+    fn delete_stream<'a>(
+        &'a self,
+        locations: BoxStream<'a, Result<Path>>,
+    ) -> BoxStream<'a, Result<Path>> {
+        self.as_ref().delete_stream(locations)
+    }
+
     async fn list(
         &self,
         prefix: Option<&Path>,
@@ -1119,6 +1183,49 @@ mod tests {
             assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
         }
 
+        // Test bulk delete
+        let paths = vec![
+            Path::from("a/a.file"),
+            Path::from("a/a/b.file"),
+            Path::from("aa/a.file"),
+            Path::from("does_not_exist"),
+            Path::from("I'm a < & weird path"),
+            Path::from("ab/a.file"),
+            Path::from("a/😀.file"),
+        ];
+
+        storage.put(&paths[4], "foo".into()).await.unwrap();
+
+        let out_paths = storage
+            .delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
+            .collect::<Vec<_>>()
+            .await;
+
+        assert_eq!(out_paths.len(), paths.len());
+
+        let expect_errors = [3];
+
+        for (i, input_path) in paths.iter().enumerate() {
+            let err = storage.head(input_path).await.unwrap_err();
+            assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+            if expect_errors.contains(&i) {
+                // Some object stores will report NotFound, but others (such as S3) will
+                // report success regardless.
+                match &out_paths[i] {
+                    Err(Error::NotFound { path: out_path, .. }) => {
+                        assert!(out_path.ends_with(&input_path.to_string()));
+                    }
+                    Ok(out_path) => {
+                        assert_eq!(out_path, input_path);
+                    }
+                    _ => panic!("unexpected error"),
+                }
+            } else {
+                assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
+            }
+        }
+
         delete_fixtures(storage).await;
     }
 
@@ -1471,11 +1578,17 @@ mod tests {
     }
 
     async fn delete_fixtures(storage: &DynObjectStore) {
-        let paths = flatten_list_stream(storage, None).await.unwrap();
-
-        for f in &paths {
-            storage.delete(f).await.unwrap();
-        }
+        let paths = storage
+            .list(None)
+            .await
+            .unwrap()
+            .map_ok(|meta| meta.location)
+            .boxed();
+        storage
+            .delete_stream(paths)
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
     }
 
     /// Test that the returned stream does not borrow the lifetime of Path
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index e0091115d..630fd145b 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -148,6 +148,13 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.delete(location).await
     }
 
+    fn delete_stream<'a>(
+        &'a self,
+        locations: BoxStream<'a, Result<Path>>,
+    ) -> BoxStream<'a, Result<Path>> {
+        self.inner.delete_stream(locations)
+    }
+
     async fn list(
         &self,
         prefix: Option<&Path>,
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 52719f1cb..bbd54db2e 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -444,9 +444,12 @@ impl ObjectStore for LocalFileSystem {
 
     async fn delete(&self, location: &Path) -> Result<()> {
         let path = self.config.path_to_filesystem(location)?;
-        maybe_spawn_blocking(move || {
-            std::fs::remove_file(&path).context(UnableToDeleteFileSnafu { path })?;
-            Ok(())
+        maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
+            Ok(_) => Ok(()),
+            Err(e) => Err(match e.kind() {
+                ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
+                _ => Error::UnableToDeleteFile { path, source: e }.into(),
+            }),
         })
         .await
     }