You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/14 15:36:48 UTC

[arrow-rs] branch master updated: Implement list_with_offset for PrefixStore (#4203)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 430286725 Implement list_with_offset for PrefixStore (#4203)
430286725 is described below

commit 43028672557d2558509608612c7cfbbd4bcb0dec
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun May 14 16:36:41 2023 +0100

    Implement list_with_offset for PrefixStore (#4203)
---
 object_store/src/prefix.rs | 147 +++++++++++++++++++++------------------------
 1 file changed, 69 insertions(+), 78 deletions(-)

diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index eba379553..94836d33c 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -22,10 +22,7 @@ use std::ops::Range;
 use tokio::io::AsyncWrite;
 
 use crate::path::Path;
-use crate::{
-    GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
-    Result as ObjectStoreResult,
-};
+use crate::{GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
 
 #[doc(hidden)]
 #[deprecated(note = "Use PrefixStore")]
@@ -59,36 +56,63 @@ impl<T: ObjectStore> PrefixStore<T> {
     }
 
     /// Strip the constant prefix from a given path
-    fn strip_prefix(&self, path: &Path) -> Option<Path> {
-        Some(path.prefix_match(&self.prefix)?.collect())
+    fn strip_prefix(&self, path: Path) -> Path {
+        // Note cannot use match because of borrow checker
+        if let Some(suffix) = path.prefix_match(&self.prefix) {
+            return suffix.collect();
+        }
+        path
+    }
+
+    /// Strip the constant prefix from a given ObjectMeta
+    fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
+        ObjectMeta {
+            last_modified: meta.last_modified,
+            size: meta.size,
+            location: self.strip_prefix(meta.location),
+            e_tag: meta.e_tag,
+        }
     }
 }
 
 #[async_trait::async_trait]
 impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
         let full_path = self.full_path(location);
         self.inner.put(&full_path, bytes).await
     }
 
+    async fn put_multipart(
+        &self,
+        location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        let full_path = self.full_path(location);
+        self.inner.put_multipart(&full_path).await
+    }
+
+    async fn abort_multipart(
+        &self,
+        location: &Path,
+        multipart_id: &MultipartId,
+    ) -> Result<()> {
+        let full_path = self.full_path(location);
+        self.inner.abort_multipart(&full_path, multipart_id).await
+    }
+
     async fn append(
         &self,
         location: &Path,
-    ) -> ObjectStoreResult<Box<dyn AsyncWrite + Unpin + Send>> {
+    ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
         let full_path = self.full_path(location);
         self.inner.append(&full_path).await
     }
 
-    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
+    async fn get(&self, location: &Path) -> Result<GetResult> {
         let full_path = self.full_path(location);
         self.inner.get(&full_path).await
     }
 
-    async fn get_range(
-        &self,
-        location: &Path,
-        range: Range<usize>,
-    ) -> ObjectStoreResult<Bytes> {
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
         let full_path = self.full_path(location);
         self.inner.get_range(&full_path, range).await
     }
@@ -97,22 +121,18 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         &self,
         location: &Path,
         ranges: &[Range<usize>],
-    ) -> ObjectStoreResult<Vec<Bytes>> {
+    ) -> Result<Vec<Bytes>> {
         let full_path = self.full_path(location);
         self.inner.get_ranges(&full_path, ranges).await
     }
 
-    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let full_path = self.full_path(location);
-        self.inner.head(&full_path).await.map(|meta| ObjectMeta {
-            last_modified: meta.last_modified,
-            size: meta.size,
-            location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
-            e_tag: meta.e_tag,
-        })
+        let meta = self.inner.head(&full_path).await?;
+        Ok(self.strip_meta(meta))
     }
 
-    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
+    async fn delete(&self, location: &Path) -> Result<()> {
         let full_path = self.full_path(location);
         self.inner.delete(&full_path).await
     }
@@ -120,94 +140,65 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
     async fn list(
         &self,
         prefix: Option<&Path>,
-    ) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
-        Ok(self
-            .inner
-            .list(Some(&self.full_path(prefix.unwrap_or(&Path::from("/")))))
-            .await?
-            .map_ok(|meta| ObjectMeta {
-                last_modified: meta.last_modified,
-                size: meta.size,
-                location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
-                e_tag: meta.e_tag,
-            })
-            .boxed())
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
+        let s = self.inner.list(Some(&prefix)).await?;
+        Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
     }
 
-    async fn list_with_delimiter(
+    async fn list_with_offset(
         &self,
         prefix: Option<&Path>,
-    ) -> ObjectStoreResult<ListResult> {
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let offset = self.full_path(offset);
+        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
+        let s = self.inner.list_with_offset(Some(&prefix), &offset).await?;
+        Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
         self.inner
-            .list_with_delimiter(Some(
-                &self.full_path(prefix.unwrap_or(&Path::from("/"))),
-            ))
+            .list_with_delimiter(Some(&prefix))
             .await
             .map(|lst| ListResult {
                 common_prefixes: lst
                     .common_prefixes
-                    .iter()
-                    .filter_map(|p| self.strip_prefix(p))
+                    .into_iter()
+                    .map(|p| self.strip_prefix(p))
                     .collect(),
                 objects: lst
                     .objects
-                    .iter()
-                    .filter_map(|meta| {
-                        Some(ObjectMeta {
-                            last_modified: meta.last_modified,
-                            size: meta.size,
-                            location: self.strip_prefix(&meta.location)?,
-                            e_tag: meta.e_tag.clone(),
-                        })
-                    })
+                    .into_iter()
+                    .map(|meta| self.strip_meta(meta))
                     .collect(),
             })
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
         self.inner.copy(&full_from, &full_to).await
     }
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
-        self.inner.copy_if_not_exists(&full_from, &full_to).await
+        self.inner.rename(&full_from, &full_to).await
     }
 
-    async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
-        self.inner.rename(&full_from, &full_to).await
+        self.inner.copy_if_not_exists(&full_from, &full_to).await
     }
 
-    async fn rename_if_not_exists(
-        &self,
-        from: &Path,
-        to: &Path,
-    ) -> ObjectStoreResult<()> {
+    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
         self.inner.rename_if_not_exists(&full_from, &full_to).await
     }
-
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        let full_path = self.full_path(location);
-        self.inner.put_multipart(&full_path).await
-    }
-
-    async fn abort_multipart(
-        &self,
-        location: &Path,
-        multipart_id: &MultipartId,
-    ) -> ObjectStoreResult<()> {
-        let full_path = self.full_path(location);
-        self.inner.abort_multipart(&full_path, multipart_id).await
-    }
 }
 
 #[cfg(test)]