You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/07/10 18:04:55 UTC

[arrow-rs] branch master updated: object_store: Implement `ObjectStore` for `Arc` (#4502)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da2f97bf object_store: Implement `ObjectStore` for `Arc` (#4502)
8da2f97bf is described below

commit 8da2f97bfd9a613c02acbd4b329d11937ca6257f
Author: Tobias Bieniek <to...@bieniek.cloud>
AuthorDate: Mon Jul 10 20:04:50 2023 +0200

    object_store: Implement `ObjectStore` for `Arc` (#4502)
    
    * object_store: Add `Box<dyn ObjectStore>` tests
    
    * object_store: Extract `as_ref_impl!()` macro
    
    * object_store: Implement `ObjectStore` for `Arc`
---
 object_store/src/lib.rs    | 179 +++++++++++++++++++++++++--------------------
 object_store/src/memory.rs |  26 +++++++
 2 files changed, 125 insertions(+), 80 deletions(-)

diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 864cabc4a..97e6aae97 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -270,6 +270,7 @@ use std::fmt::{Debug, Formatter};
 #[cfg(not(target_arch = "wasm32"))]
 use std::io::{Read, Seek, SeekFrom};
 use std::ops::Range;
+use std::sync::Arc;
 use tokio::io::AsyncWrite;
 
 #[cfg(any(feature = "azure", feature = "aws", feature = "gcp", feature = "http"))]
@@ -526,105 +527,123 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
     }
 }
 
-#[async_trait]
-impl ObjectStore for Box<dyn ObjectStore> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.as_ref().put(location, bytes).await
-    }
+macro_rules! as_ref_impl {
+    ($type:ty) => {
+        #[async_trait]
+        impl ObjectStore for $type {
+            async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+                self.as_ref().put(location, bytes).await
+            }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        self.as_ref().put_multipart(location).await
-    }
+            async fn put_multipart(
+                &self,
+                location: &Path,
+            ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+                self.as_ref().put_multipart(location).await
+            }
 
-    async fn abort_multipart(
-        &self,
-        location: &Path,
-        multipart_id: &MultipartId,
-    ) -> Result<()> {
-        self.as_ref().abort_multipart(location, multipart_id).await
-    }
+            async fn abort_multipart(
+                &self,
+                location: &Path,
+                multipart_id: &MultipartId,
+            ) -> Result<()> {
+                self.as_ref().abort_multipart(location, multipart_id).await
+            }
 
-    async fn append(
-        &self,
-        location: &Path,
-    ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
-        self.as_ref().append(location).await
-    }
+            async fn append(
+                &self,
+                location: &Path,
+            ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
+                self.as_ref().append(location).await
+            }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        self.as_ref().get(location).await
-    }
+            async fn get(&self, location: &Path) -> Result<GetResult> {
+                self.as_ref().get(location).await
+            }
 
-    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
-        self.as_ref().get_opts(location, options).await
-    }
+            async fn get_opts(
+                &self,
+                location: &Path,
+                options: GetOptions,
+            ) -> Result<GetResult> {
+                self.as_ref().get_opts(location, options).await
+            }
 
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
-        self.as_ref().get_range(location, range).await
-    }
+            async fn get_range(
+                &self,
+                location: &Path,
+                range: Range<usize>,
+            ) -> Result<Bytes> {
+                self.as_ref().get_range(location, range).await
+            }
 
-    async fn get_ranges(
-        &self,
-        location: &Path,
-        ranges: &[Range<usize>],
-    ) -> Result<Vec<Bytes>> {
-        self.as_ref().get_ranges(location, ranges).await
-    }
+            async fn get_ranges(
+                &self,
+                location: &Path,
+                ranges: &[Range<usize>],
+            ) -> Result<Vec<Bytes>> {
+                self.as_ref().get_ranges(location, ranges).await
+            }
 
-    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        self.as_ref().head(location).await
-    }
+            async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+                self.as_ref().head(location).await
+            }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.as_ref().delete(location).await
-    }
+            async fn delete(&self, location: &Path) -> Result<()> {
+                self.as_ref().delete(location).await
+            }
 
-    fn delete_stream<'a>(
-        &'a self,
-        locations: BoxStream<'a, Result<Path>>,
-    ) -> BoxStream<'a, Result<Path>> {
-        self.as_ref().delete_stream(locations)
-    }
+            fn delete_stream<'a>(
+                &'a self,
+                locations: BoxStream<'a, Result<Path>>,
+            ) -> BoxStream<'a, Result<Path>> {
+                self.as_ref().delete_stream(locations)
+            }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.as_ref().list(prefix).await
-    }
+            async fn list(
+                &self,
+                prefix: Option<&Path>,
+            ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+                self.as_ref().list(prefix).await
+            }
 
-    async fn list_with_offset(
-        &self,
-        prefix: Option<&Path>,
-        offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.as_ref().list_with_offset(prefix, offset).await
-    }
+            async fn list_with_offset(
+                &self,
+                prefix: Option<&Path>,
+                offset: &Path,
+            ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+                self.as_ref().list_with_offset(prefix, offset).await
+            }
 
-    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
-        self.as_ref().list_with_delimiter(prefix).await
-    }
+            async fn list_with_delimiter(
+                &self,
+                prefix: Option<&Path>,
+            ) -> Result<ListResult> {
+                self.as_ref().list_with_delimiter(prefix).await
+            }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.as_ref().copy(from, to).await
-    }
+            async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+                self.as_ref().copy(from, to).await
+            }
 
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
-        self.as_ref().rename(from, to).await
-    }
+            async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+                self.as_ref().rename(from, to).await
+            }
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.as_ref().copy_if_not_exists(from, to).await
-    }
+            async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+                self.as_ref().copy_if_not_exists(from, to).await
+            }
 
-    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.as_ref().rename_if_not_exists(from, to).await
-    }
+            async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+                self.as_ref().rename_if_not_exists(from, to).await
+            }
+        }
+    };
 }
 
+as_ref_impl!(Arc<dyn ObjectStore>);
+as_ref_impl!(Box<dyn ObjectStore>);
+
 /// Result of a list call that includes objects, prefixes (directories) and a
 /// token for the next set of results. Individual result sets may be limited to
 /// 1,000 objects based on the underlying object storage's limitations.
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 98b3a15ee..cfc2ac823 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -415,6 +415,32 @@ mod tests {
         stream_get(&integration).await;
     }
 
+    #[tokio::test]
+    async fn box_test() {
+        let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
+
+        put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
+        list_uses_directories_correctly(&integration).await;
+        list_with_delimiter(&integration).await;
+        rename_and_copy(&integration).await;
+        copy_if_not_exists(&integration).await;
+        stream_get(&integration).await;
+    }
+
+    #[tokio::test]
+    async fn arc_test() {
+        let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+
+        put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
+        list_uses_directories_correctly(&integration).await;
+        list_with_delimiter(&integration).await;
+        rename_and_copy(&integration).await;
+        copy_if_not_exists(&integration).await;
+        stream_get(&integration).await;
+    }
+
     #[tokio::test]
     async fn unknown_length() {
         let integration = InMemory::new();