You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/14 15:36:48 UTC
[arrow-rs] branch master updated: Implement list_with_offset for PrefixStore (#4203)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 430286725 Implement list_with_offset for PrefixStore (#4203)
430286725 is described below
commit 43028672557d2558509608612c7cfbbd4bcb0dec
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun May 14 16:36:41 2023 +0100
Implement list_with_offset for PrefixStore (#4203)
---
object_store/src/prefix.rs | 147 +++++++++++++++++++++------------------------
1 file changed, 69 insertions(+), 78 deletions(-)
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index eba379553..94836d33c 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -22,10 +22,7 @@ use std::ops::Range;
use tokio::io::AsyncWrite;
use crate::path::Path;
-use crate::{
- GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
- Result as ObjectStoreResult,
-};
+use crate::{GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
#[doc(hidden)]
#[deprecated(note = "Use PrefixStore")]
@@ -59,36 +56,63 @@ impl<T: ObjectStore> PrefixStore<T> {
}
/// Strip the constant prefix from a given path
- fn strip_prefix(&self, path: &Path) -> Option<Path> {
- Some(path.prefix_match(&self.prefix)?.collect())
+ fn strip_prefix(&self, path: Path) -> Path {
+ // Note cannot use match because of borrow checker
+ if let Some(suffix) = path.prefix_match(&self.prefix) {
+ return suffix.collect();
+ }
+ path
+ }
+
+ /// Strip the constant prefix from a given ObjectMeta
+ fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
+ ObjectMeta {
+ last_modified: meta.last_modified,
+ size: meta.size,
+ location: self.strip_prefix(meta.location),
+ e_tag: meta.e_tag,
+ }
}
}
#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
- async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let full_path = self.full_path(location);
self.inner.put(&full_path, bytes).await
}
+ async fn put_multipart(
+ &self,
+ location: &Path,
+ ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ let full_path = self.full_path(location);
+ self.inner.put_multipart(&full_path).await
+ }
+
+ async fn abort_multipart(
+ &self,
+ location: &Path,
+ multipart_id: &MultipartId,
+ ) -> Result<()> {
+ let full_path = self.full_path(location);
+ self.inner.abort_multipart(&full_path, multipart_id).await
+ }
+
async fn append(
&self,
location: &Path,
- ) -> ObjectStoreResult<Box<dyn AsyncWrite + Unpin + Send>> {
+ ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
let full_path = self.full_path(location);
self.inner.append(&full_path).await
}
- async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
+ async fn get(&self, location: &Path) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
}
- async fn get_range(
- &self,
- location: &Path,
- range: Range<usize>,
- ) -> ObjectStoreResult<Bytes> {
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let full_path = self.full_path(location);
self.inner.get_range(&full_path, range).await
}
@@ -97,22 +121,18 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
&self,
location: &Path,
ranges: &[Range<usize>],
- ) -> ObjectStoreResult<Vec<Bytes>> {
+ ) -> Result<Vec<Bytes>> {
let full_path = self.full_path(location);
self.inner.get_ranges(&full_path, ranges).await
}
- async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let full_path = self.full_path(location);
- self.inner.head(&full_path).await.map(|meta| ObjectMeta {
- last_modified: meta.last_modified,
- size: meta.size,
- location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
- e_tag: meta.e_tag,
- })
+ let meta = self.inner.head(&full_path).await?;
+ Ok(self.strip_meta(meta))
}
- async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
+ async fn delete(&self, location: &Path) -> Result<()> {
let full_path = self.full_path(location);
self.inner.delete(&full_path).await
}
@@ -120,94 +140,65 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
async fn list(
&self,
prefix: Option<&Path>,
- ) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
- Ok(self
- .inner
- .list(Some(&self.full_path(prefix.unwrap_or(&Path::from("/")))))
- .await?
- .map_ok(|meta| ObjectMeta {
- last_modified: meta.last_modified,
- size: meta.size,
- location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
- e_tag: meta.e_tag,
- })
- .boxed())
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
+ let s = self.inner.list(Some(&prefix)).await?;
+ Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
}
- async fn list_with_delimiter(
+ async fn list_with_offset(
&self,
prefix: Option<&Path>,
- ) -> ObjectStoreResult<ListResult> {
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let offset = self.full_path(offset);
+ let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
+ let s = self.inner.list_with_offset(Some(&prefix), &offset).await?;
+ Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
self.inner
- .list_with_delimiter(Some(
- &self.full_path(prefix.unwrap_or(&Path::from("/"))),
- ))
+ .list_with_delimiter(Some(&prefix))
.await
.map(|lst| ListResult {
common_prefixes: lst
.common_prefixes
- .iter()
- .filter_map(|p| self.strip_prefix(p))
+ .into_iter()
+ .map(|p| self.strip_prefix(p))
.collect(),
objects: lst
.objects
- .iter()
- .filter_map(|meta| {
- Some(ObjectMeta {
- last_modified: meta.last_modified,
- size: meta.size,
- location: self.strip_prefix(&meta.location)?,
- e_tag: meta.e_tag.clone(),
- })
- })
+ .into_iter()
+ .map(|meta| self.strip_meta(meta))
.collect(),
})
}
- async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.copy(&full_from, &full_to).await
}
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+ async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
- self.inner.copy_if_not_exists(&full_from, &full_to).await
+ self.inner.rename(&full_from, &full_to).await
}
- async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
- self.inner.rename(&full_from, &full_to).await
+ self.inner.copy_if_not_exists(&full_from, &full_to).await
}
- async fn rename_if_not_exists(
- &self,
- from: &Path,
- to: &Path,
- ) -> ObjectStoreResult<()> {
+ async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.rename_if_not_exists(&full_from, &full_to).await
}
-
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let full_path = self.full_path(location);
- self.inner.put_multipart(&full_path).await
- }
-
- async fn abort_multipart(
- &self,
- location: &Path,
- multipart_id: &MultipartId,
- ) -> ObjectStoreResult<()> {
- let full_path = self.full_path(location);
- self.inner.abort_multipart(&full_path, multipart_id).await
- }
}
#[cfg(test)]