You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/05/22 09:34:07 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4060: feat: support bulk deletes in object_store

tustvold commented on code in PR #4060:
URL: https://github.com/apache/arrow-rs/pull/4060#discussion_r1200244730


##########
object_store/src/util.rs:
##########
@@ -170,6 +178,22 @@ fn merge_ranges(
     ret
 }
 
+/// Common implementation for delete_all
+#[allow(dead_code)]
+pub(crate) fn delete_all_helper<'a>(
+    locations: BoxStream<'a, Path>,
+    chunk_size: usize,
+    request_handler: impl Fn(

Review Comment:
   A generic instead of an impl might be easier to read



##########
object_store/src/lib.rs:
##########
@@ -1119,6 +1189,59 @@ mod tests {
             assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
         }
 
+        // Test bulk delete
+        let paths = vec![
+            Path::from("a/a.file"),
+            Path::from("a/a/b.file"),
+            Path::from("aa/a.file"),
+            Path::from("ab/a.file"),
+            Path::from("a/😀.file"),
+        ];
+
+        let out_paths = storage
+            .delete_stream(futures::stream::iter(paths.clone()).boxed())
+            .buffered(5)
+            .map_ok(futures::stream::iter)
+            .try_flatten()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        for path in &paths {
+            let err = storage.head(path).await.unwrap_err();
+            assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+        }
+
+        // Some object stores return results out of order (for example, S3)

Review Comment:
   Sorting the output instead of collecting into a HashSet might make for easier to read error messages perhaps



##########
object_store/src/aws/mod.rs:
##########
@@ -249,6 +251,15 @@ impl ObjectStore for AmazonS3 {
         self.client.delete_request(location, &()).await
     }
 
+    fn delete_stream<'a>(
+        &'a self,
+        locations: BoxStream<'a, Path>,
+    ) -> BoxStream<'a, BoxFuture<'a, Result<Vec<Result<Path>>>>> {

Review Comment:
   This signature whilst it does provide the maximum flexibility to the upstreams, is kind of obnoxious to use
   
   What do you think of returning `BoxStream<'a, Result<Path>>` and letting the individual stores control the concurrency, much like we do for coalesce_ranges? 



##########
object_store/src/aws/client.rs:
##########
@@ -129,6 +153,43 @@ struct MultipartPart {
     part_number: usize,
 }
 
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
+struct BatchDeleteResponse {

Review Comment:
   Given we seem to have enabled a feature to get this to deserialize correctly, could we perhaps get a basic test of this deserialization logic - i.e. given a payload with mixed success and failures, it deserialized correctly?



##########
object_store/src/aws/client.rs:
##########
@@ -243,6 +304,83 @@ impl S3Client {
         Ok(())
     }
 
+    /// Make an S3 Delete Objects request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
+    ///
+    /// Produces a vector of results, one for each path in the input vector. If
+    /// the delete was successful, the path is returned in the `Ok` variant. If
+    /// there was an error for a certain path, the error will be returned in the
+    /// vector. If there was an issue with making the overall request, an error
+    /// will be returned at the top level.
+    pub async fn bulk_delete_request(
+        &self,
+        paths: Vec<Path>,
+    ) -> Result<Vec<Result<Path>>> {
+        if paths.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let credential = self.get_credential().await?;
+        let url = format!("{}?delete", self.config.bucket_endpoint);
+
+        let inner_body = paths
+            .iter()
+            .map(|path| format!("<Object><Key>{}</Key></Object>", path))

Review Comment:
   I think this might run into weirdness due to XML escaping, could we use quick-xml to serialize this payload instead of string formatting?
   
   I think Path allows '&' for example, a test of this would be superb...



##########
object_store/src/util.rs:
##########
@@ -170,6 +178,22 @@ fn merge_ranges(
     ret
 }
 
+/// Common implementation for delete_all
+#[allow(dead_code)]
+pub(crate) fn delete_all_helper<'a>(

Review Comment:
   I wonder if we could follow the pattern in #4220 of using extension traits for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org