You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/07/10 18:04:55 UTC
[arrow-rs] branch master updated: object_store: Implement `ObjectStore` for `Arc` (#4502)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8da2f97bf object_store: Implement `ObjectStore` for `Arc` (#4502)
8da2f97bf is described below
commit 8da2f97bfd9a613c02acbd4b329d11937ca6257f
Author: Tobias Bieniek <to...@bieniek.cloud>
AuthorDate: Mon Jul 10 20:04:50 2023 +0200
object_store: Implement `ObjectStore` for `Arc` (#4502)
* object_store: Add `Box<dyn ObjectStore>` tests
* object_store: Extract `as_ref_impl!()` macro
* object_store: Implement `ObjectStore` for `Arc`
---
object_store/src/lib.rs | 179 +++++++++++++++++++++++++--------------------
object_store/src/memory.rs | 26 +++++++
2 files changed, 125 insertions(+), 80 deletions(-)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 864cabc4a..97e6aae97 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -270,6 +270,7 @@ use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
+use std::sync::Arc;
use tokio::io::AsyncWrite;
#[cfg(any(feature = "azure", feature = "aws", feature = "gcp", feature = "http"))]
@@ -526,105 +527,123 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
}
}
-#[async_trait]
-impl ObjectStore for Box<dyn ObjectStore> {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.as_ref().put(location, bytes).await
- }
+macro_rules! as_ref_impl {
+ ($type:ty) => {
+ #[async_trait]
+ impl ObjectStore for $type {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ self.as_ref().put(location, bytes).await
+ }
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- self.as_ref().put_multipart(location).await
- }
+ async fn put_multipart(
+ &self,
+ location: &Path,
+ ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ self.as_ref().put_multipart(location).await
+ }
- async fn abort_multipart(
- &self,
- location: &Path,
- multipart_id: &MultipartId,
- ) -> Result<()> {
- self.as_ref().abort_multipart(location, multipart_id).await
- }
+ async fn abort_multipart(
+ &self,
+ location: &Path,
+ multipart_id: &MultipartId,
+ ) -> Result<()> {
+ self.as_ref().abort_multipart(location, multipart_id).await
+ }
- async fn append(
- &self,
- location: &Path,
- ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
- self.as_ref().append(location).await
- }
+ async fn append(
+ &self,
+ location: &Path,
+ ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
+ self.as_ref().append(location).await
+ }
- async fn get(&self, location: &Path) -> Result<GetResult> {
- self.as_ref().get(location).await
- }
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ self.as_ref().get(location).await
+ }
- async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
- self.as_ref().get_opts(location, options).await
- }
+ async fn get_opts(
+ &self,
+ location: &Path,
+ options: GetOptions,
+ ) -> Result<GetResult> {
+ self.as_ref().get_opts(location, options).await
+ }
- async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
- self.as_ref().get_range(location, range).await
- }
+ async fn get_range(
+ &self,
+ location: &Path,
+ range: Range<usize>,
+ ) -> Result<Bytes> {
+ self.as_ref().get_range(location, range).await
+ }
- async fn get_ranges(
- &self,
- location: &Path,
- ranges: &[Range<usize>],
- ) -> Result<Vec<Bytes>> {
- self.as_ref().get_ranges(location, ranges).await
- }
+ async fn get_ranges(
+ &self,
+ location: &Path,
+ ranges: &[Range<usize>],
+ ) -> Result<Vec<Bytes>> {
+ self.as_ref().get_ranges(location, ranges).await
+ }
- async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- self.as_ref().head(location).await
- }
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ self.as_ref().head(location).await
+ }
- async fn delete(&self, location: &Path) -> Result<()> {
- self.as_ref().delete(location).await
- }
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.as_ref().delete(location).await
+ }
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
- self.as_ref().delete_stream(locations)
- }
+ fn delete_stream<'a>(
+ &'a self,
+ locations: BoxStream<'a, Result<Path>>,
+ ) -> BoxStream<'a, Result<Path>> {
+ self.as_ref().delete_stream(locations)
+ }
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.as_ref().list(prefix).await
- }
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ self.as_ref().list(prefix).await
+ }
- async fn list_with_offset(
- &self,
- prefix: Option<&Path>,
- offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.as_ref().list_with_offset(prefix, offset).await
- }
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ self.as_ref().list_with_offset(prefix, offset).await
+ }
- async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
- self.as_ref().list_with_delimiter(prefix).await
- }
+ async fn list_with_delimiter(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<ListResult> {
+ self.as_ref().list_with_delimiter(prefix).await
+ }
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.as_ref().copy(from, to).await
- }
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ self.as_ref().copy(from, to).await
+ }
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
- self.as_ref().rename(from, to).await
- }
+ async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ self.as_ref().rename(from, to).await
+ }
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.as_ref().copy_if_not_exists(from, to).await
- }
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.as_ref().copy_if_not_exists(from, to).await
+ }
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.as_ref().rename_if_not_exists(from, to).await
- }
+ async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.as_ref().rename_if_not_exists(from, to).await
+ }
+ }
+ };
}
+as_ref_impl!(Arc<dyn ObjectStore>);
+as_ref_impl!(Box<dyn ObjectStore>);
+
/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 98b3a15ee..cfc2ac823 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -415,6 +415,32 @@ mod tests {
stream_get(&integration).await;
}
+ #[tokio::test]
+ async fn box_test() {
+ let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
+
+ put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ stream_get(&integration).await;
+ }
+
+ #[tokio::test]
+ async fn arc_test() {
+ let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+
+ put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ stream_get(&integration).await;
+ }
+
#[tokio::test]
async fn unknown_length() {
let integration = InMemory::new();