You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/27 09:31:39 UTC
[arrow-rs] branch master updated: feat: support bulk deletes in object_store (#4060)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 770e241ce feat: support bulk deletes in object_store (#4060)
770e241ce is described below
commit 770e241ceac89d693dd7577c72266f6dad48c9e2
Author: Will Jones <wi...@gmail.com>
AuthorDate: Sat May 27 02:31:34 2023 -0700
feat: support bulk deletes in object_store (#4060)
* feat: support bulk deletes
* fix: make NotFound reporting consistent
* fix http store
* fix aws support
* remove unnecessary flag
* fix: make AWS S3 compatible
* pr feedback: use simpler API
* pr feedback: test paths and ordering
* Update object_store/src/limit.rs
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* take fallible stream
* final pr feedback
---------
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
object_store/CONTRIBUTING.md | 14 +++-
object_store/Cargo.toml | 2 +-
object_store/src/aws/client.rs | 175 ++++++++++++++++++++++++++++++++++++++++
object_store/src/aws/mod.rs | 21 +++++
object_store/src/http/client.rs | 8 +-
object_store/src/lib.rs | 123 ++++++++++++++++++++++++++--
object_store/src/limit.rs | 7 ++
object_store/src/local.rs | 9 ++-
8 files changed, 345 insertions(+), 14 deletions(-)
diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
index 47c294022..aeb38e13a 100644
--- a/object_store/CONTRIBUTING.md
+++ b/object_store/CONTRIBUTING.md
@@ -39,7 +39,8 @@ To test the S3 integration against [localstack](https://localstack.cloud/)
First start up a container running localstack
```
-$ podman run --rm -it -e PROVIDER_OVERRIDE_S3=asf -p 4566:4566 -p 4510-4559:4510-4559 localstack/localstack
+$ podman run -d -p 4566:4566 localstack/localstack:2.0
+$ podman run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
```
Setup environment
@@ -87,13 +88,18 @@ $ podman run -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azur
Create a bucket
```
-$ podman run --net=host mcr.microsoft.com/azure-cli az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://128.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://128.0.0.1:10001/devstoreaccount1;'
+$ podman run --net=host mcr.microsoft.com/azure-cli az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
```
Run tests
-```
-$ cargo test --features azure
+```shell
+AZURE_USE_EMULATOR=1 \
+TEST_INTEGRATION=1 \
+OBJECT_STORE_BUCKET=test-bucket \
+AZURE_STORAGE_ACCOUNT=devstoreaccount1 \
+AZURE_STORAGE_ACCESS_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \
+cargo test --features azure
```
### GCP
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 1fb988642..28bf29f7f 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -45,7 +45,7 @@ walkdir = "2"
# Cloud storage support
base64 = { version = "0.21", default-features = false, features = ["std"], optional = true }
hyper = { version = "0.14", default-features = false, optional = true }
-quick-xml = { version = "0.28.0", features = ["serialize"], optional = true }
+quick-xml = { version = "0.28.0", features = ["serialize", "overlapped-lists"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index cfce35254..0c2493651 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -32,7 +32,9 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
+use itertools::Itertools;
use percent_encoding::{utf8_percent_encode, PercentEncode};
+use quick_xml::events::{self as xml_events};
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
@@ -66,6 +68,29 @@ pub(crate) enum Error {
path: String,
},
+ #[snafu(display("Error performing DeleteObjects request: {}", source))]
+ DeleteObjectsRequest { source: crate::client::retry::Error },
+
+ #[snafu(display(
+ "DeleteObjects request failed for key {}: {} (code: {})",
+ path,
+ message,
+ code
+ ))]
+ DeleteFailed {
+ path: String,
+ code: String,
+ message: String,
+ },
+
+ #[snafu(display("Error getting DeleteObjects response body: {}", source))]
+ DeleteObjectsResponse { source: reqwest::Error },
+
+ #[snafu(display("Got invalid DeleteObjects response: {}", source))]
+ InvalidDeleteObjectsResponse {
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
source: crate::client::retry::Error,
@@ -129,6 +154,44 @@ struct MultipartPart {
part_number: usize,
}
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
+struct BatchDeleteResponse {
+ #[serde(rename = "$value")]
+ content: Vec<DeleteObjectResult>,
+}
+
+#[derive(Deserialize)]
+enum DeleteObjectResult {
+ Deleted(DeletedObject),
+ Error(DeleteError),
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "Deleted")]
+struct DeletedObject {
+ #[allow(dead_code)]
+ key: String,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "Error")]
+struct DeleteError {
+ key: String,
+ code: String,
+ message: String,
+}
+
+impl From<DeleteError> for Error {
+ fn from(err: DeleteError) -> Self {
+ Self::DeleteFailed {
+ path: err.key,
+ code: err.code,
+ message: err.message,
+ }
+ }
+}
+
#[derive(Debug)]
pub struct S3Config {
pub region: String,
@@ -243,6 +306,118 @@ impl S3Client {
Ok(())
}
+ /// Make an S3 Delete Objects request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
+ ///
+ /// Produces a vector of results, one for each path in the input vector. If
+ /// the delete was successful, the path is returned in the `Ok` variant. If
+ /// there was an error for a certain path, the error will be returned in the
+ /// vector. If there was an issue with making the overall request, an error
+ /// will be returned at the top level.
+ pub async fn bulk_delete_request(
+ &self,
+ paths: Vec<Path>,
+ ) -> Result<Vec<Result<Path>>> {
+ if paths.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let credential = self.get_credential().await?;
+ let url = format!("{}?delete", self.config.bucket_endpoint);
+
+ let mut buffer = Vec::new();
+ let mut writer = quick_xml::Writer::new(&mut buffer);
+ writer
+ .write_event(xml_events::Event::Start(
+ xml_events::BytesStart::new("Delete").with_attributes([(
+ "xmlns",
+ "http://s3.amazonaws.com/doc/2006-03-01/",
+ )]),
+ ))
+ .unwrap();
+ for path in &paths {
+ // <Object><Key>{path}</Key></Object>
+ writer
+ .write_event(xml_events::Event::Start(xml_events::BytesStart::new(
+ "Object",
+ )))
+ .unwrap();
+ writer
+ .write_event(xml_events::Event::Start(xml_events::BytesStart::new("Key")))
+ .unwrap();
+ writer
+ .write_event(xml_events::Event::Text(xml_events::BytesText::new(
+ path.as_ref(),
+ )))
+ .map_err(|err| crate::Error::Generic {
+ store: STORE,
+ source: Box::new(err),
+ })?;
+ writer
+ .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Key")))
+ .unwrap();
+ writer
+ .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Object")))
+ .unwrap();
+ }
+ writer
+ .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Delete")))
+ .unwrap();
+
+ let body = Bytes::from(buffer);
+
+ let mut builder = self.client.request(Method::POST, url);
+
+ // Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to
+ // their algorithm if the user hasn't specified one.
+ let checksum = self.config().checksum.unwrap_or(Checksum::SHA256);
+ let digest = checksum.digest(&body);
+ builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
+ let payload_sha256 = if checksum == Checksum::SHA256 {
+ Some(digest)
+ } else {
+ None
+ };
+
+ let response = builder
+ .header(CONTENT_TYPE, "application/xml")
+ .body(body)
+ .with_aws_sigv4(
+ credential.as_ref(),
+ &self.config.region,
+ "s3",
+ self.config.sign_payload,
+ payload_sha256.as_deref(),
+ )
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(DeleteObjectsRequestSnafu {})?
+ .bytes()
+ .await
+ .context(DeleteObjectsResponseSnafu {})?;
+
+ let response: BatchDeleteResponse = quick_xml::de::from_reader(response.reader())
+ .map_err(|err| Error::InvalidDeleteObjectsResponse {
+ source: Box::new(err),
+ })?;
+
+ // Assume all were ok, then fill in errors. This guarantees output order
+ // matches input order.
+ let mut results: Vec<Result<Path>> = paths.iter().cloned().map(Ok).collect();
+ for content in response.content.into_iter() {
+ if let DeleteObjectResult::Error(error) = content {
+ let path = Path::parse(&error.key).map_err(|err| {
+ Error::InvalidDeleteObjectsResponse {
+ source: Box::new(err),
+ }
+ })?;
+ let i = paths.iter().find_position(|&p| p == &path).unwrap().0;
+ results[i] = Err(Error::from(error).into());
+ }
+ }
+
+ Ok(results)
+ }
+
/// Make an S3 Copy request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
pub async fn copy_request(&self, from: &Path, to: &Path) -> Result<()> {
let credential = self.get_credential().await?;
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index fac6165b5..3696e4ad4 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -34,6 +34,7 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
@@ -252,6 +253,26 @@ impl ObjectStore for AmazonS3 {
self.client.delete_request(location, &()).await
}
+ fn delete_stream<'a>(
+ &'a self,
+ locations: BoxStream<'a, Result<Path>>,
+ ) -> BoxStream<'a, Result<Path>> {
+ locations
+ .try_chunks(1_000)
+ .map(move |locations| async {
+ // Early return the error. We ignore the paths that have already been
+ // collected into the chunk.
+ let locations = locations.map_err(|e| e.1)?;
+ self.client
+ .bulk_delete_request(locations)
+ .await
+ .map(futures::stream::iter)
+ })
+ .buffered(20)
+ .try_flatten()
+ .boxed()
+ }
+
async fn list(
&self,
prefix: Option<&Path>,
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 6feacbba6..1d3df34db 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -225,7 +225,13 @@ impl Client {
.delete(url)
.send_retry(&self.retry_config)
.await
- .context(RequestSnafu)?;
+ .map_err(|source| match source.status() {
+ Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+ source: Box::new(source),
+ path: path.to_string(),
+ },
+ _ => Error::Request { source }.into(),
+ })?;
Ok(())
}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 7116a8732..c5bf40cc4 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -386,6 +386,63 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;
+ /// Delete all the objects at the specified locations
+ ///
+ /// When supported, this method will use bulk operations that delete more
+ /// than one object per a request. The default implementation will call
+ /// the single object delete method for each location, but with up to 10
+ /// concurrent requests.
+ ///
+ /// The returned stream yields the results of the delete operations in the
+ /// same order as the input locations. However, some errors will be from
+ /// an overall call to a bulk delete operation, and not from a specific
+ /// location.
+ ///
+ /// If the object did not exist, the result may be an error or a success,
+ /// depending on the behavior of the underlying store. For example, local
+ /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
+ /// return Ok. If it is an error, it will be [`Error::NotFound`].
+ ///
+ /// ```
+ /// # use object_store::local::LocalFileSystem;
+ /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+ /// # let root = tempfile::TempDir::new().unwrap();
+ /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+ /// use object_store::{ObjectStore, ObjectMeta};
+ /// use object_store::path::Path;
+ /// use futures::{StreamExt, TryStreamExt};
+ /// use bytes::Bytes;
+ ///
+ /// // Create two objects
+ /// store.put(&Path::from("foo"), Bytes::from("foo")).await?;
+ /// store.put(&Path::from("bar"), Bytes::from("bar")).await?;
+ ///
+ /// // List object
+ /// let locations = store.list(None).await?
+ /// .map(|meta: Result<ObjectMeta, _>| meta.map(|m| m.location))
+ /// .boxed();
+ ///
+ /// // Delete them
+ /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
+ /// # Ok(())
+ /// # }
+ /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
+ /// # rt.block_on(example()).unwrap();
+ /// ```
+ fn delete_stream<'a>(
+ &'a self,
+ locations: BoxStream<'a, Result<Path>>,
+ ) -> BoxStream<'a, Result<Path>> {
+ locations
+ .map(|location| async {
+ let location = location?;
+ self.delete(&location).await?;
+ Ok(location)
+ })
+ .buffered(10)
+ .boxed()
+ }
+
/// List all the objects with the given prefix.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
@@ -515,6 +572,13 @@ impl ObjectStore for Box<dyn ObjectStore> {
self.as_ref().delete(location).await
}
+ fn delete_stream<'a>(
+ &'a self,
+ locations: BoxStream<'a, Result<Path>>,
+ ) -> BoxStream<'a, Result<Path>> {
+ self.as_ref().delete_stream(locations)
+ }
+
async fn list(
&self,
prefix: Option<&Path>,
@@ -1119,6 +1183,49 @@ mod tests {
assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
}
+ // Test bulk delete
+ let paths = vec![
+ Path::from("a/a.file"),
+ Path::from("a/a/b.file"),
+ Path::from("aa/a.file"),
+ Path::from("does_not_exist"),
+ Path::from("I'm a < & weird path"),
+ Path::from("ab/a.file"),
+ Path::from("a/😀.file"),
+ ];
+
+ storage.put(&paths[4], "foo".into()).await.unwrap();
+
+ let out_paths = storage
+ .delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
+ .collect::<Vec<_>>()
+ .await;
+
+ assert_eq!(out_paths.len(), paths.len());
+
+ let expect_errors = [3];
+
+ for (i, input_path) in paths.iter().enumerate() {
+ let err = storage.head(input_path).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ if expect_errors.contains(&i) {
+ // Some object stores will report NotFound, but others (such as S3) will
+ // report success regardless.
+ match &out_paths[i] {
+ Err(Error::NotFound { path: out_path, .. }) => {
+ assert!(out_path.ends_with(&input_path.to_string()));
+ }
+ Ok(out_path) => {
+ assert_eq!(out_path, input_path);
+ }
+ _ => panic!("unexpected error"),
+ }
+ } else {
+ assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
+ }
+ }
+
delete_fixtures(storage).await;
}
@@ -1471,11 +1578,17 @@ mod tests {
}
async fn delete_fixtures(storage: &DynObjectStore) {
- let paths = flatten_list_stream(storage, None).await.unwrap();
-
- for f in &paths {
- storage.delete(f).await.unwrap();
- }
+ let paths = storage
+ .list(None)
+ .await
+ .unwrap()
+ .map_ok(|meta| meta.location)
+ .boxed();
+ storage
+ .delete_stream(paths)
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
}
/// Test that the returned stream does not borrow the lifetime of Path
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index e0091115d..630fd145b 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -148,6 +148,13 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.delete(location).await
}
+ fn delete_stream<'a>(
+ &'a self,
+ locations: BoxStream<'a, Result<Path>>,
+ ) -> BoxStream<'a, Result<Path>> {
+ self.inner.delete_stream(locations)
+ }
+
async fn list(
&self,
prefix: Option<&Path>,
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 52719f1cb..bbd54db2e 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -444,9 +444,12 @@ impl ObjectStore for LocalFileSystem {
async fn delete(&self, location: &Path) -> Result<()> {
let path = self.config.path_to_filesystem(location)?;
- maybe_spawn_blocking(move || {
- std::fs::remove_file(&path).context(UnableToDeleteFileSnafu { path })?;
- Ok(())
+ maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
+ Ok(_) => Ok(()),
+ Err(e) => Err(match e.kind() {
+ ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
+ _ => Error::UnableToDeleteFile { path, source: e }.into(),
+ }),
})
.await
}