You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/02 17:40:05 UTC
[arrow-rs] branch master updated: Fix ObjectStore::get_range for GetResult::File (#4350) (#4351)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ec0f75aea Fix ObjectStore::get_range for GetResult::File (#4350) (#4351)
ec0f75aea is described below
commit ec0f75aeabd07d06395b70ce2e4e3573da520ba7
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Jun 2 18:39:59 2023 +0100
Fix ObjectStore::get_range for GetResult::File (#4350) (#4351)
* Fix ObjectStore::get_range for GetResult::File (#4350)
* Review feedback
---
object_store/Cargo.toml | 4 ++
object_store/src/lib.rs | 14 ++++-
object_store/src/local.rs | 2 +-
object_store/tests/get_range_file.rs | 116 +++++++++++++++++++++++++++++++++++
4 files changed, 133 insertions(+), 3 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 28bf29f7f..4002a1865 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -75,3 +75,7 @@ tempfile = "3.1.0"
futures-test = "0.3"
rand = "0.8"
hyper = { version = "0.14.24", features = ["server"] }
+
+[[test]]
+name = "get_range_file"
+path = "tests/get_range_file.rs"
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 98bbb7adc..864cabc4a 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -359,10 +359,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
- range: Some(range),
+ range: Some(range.clone()),
..Default::default()
};
- self.get_opts(location, options).await?.bytes().await
+ // Temporary until GetResult::File supports range (#4352)
+ match self.get_opts(location, options).await? {
+ GetResult::Stream(s) => collect_bytes(s, None).await,
+ #[cfg(not(target_arch = "wasm32"))]
+ GetResult::File(mut file, path) => {
+ maybe_spawn_blocking(move || local::read_range(&mut file, &path, range))
+ .await
+ }
+ #[cfg(target_arch = "wasm32")]
+ _ => unimplemented!("File IO not implemented on wasm32."),
+ }
}
/// Return the bytes that are stored at the specified location
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 6039f8dba..ffff6a573 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -863,7 +863,7 @@ impl AsyncWrite for LocalUpload {
}
}
-fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
+pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.context(SeekSnafu { path })?;
diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs
new file mode 100644
index 000000000..f926e3b07
--- /dev/null
+++ b/object_store/tests/get_range_file.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests the default implementation of get_range handles GetResult::File correctly (#4350)
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use object_store::local::LocalFileSystem;
+use object_store::path::Path;
+use object_store::{
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+};
+use std::fmt::Formatter;
+use tempfile::tempdir;
+use tokio::io::AsyncWrite;
+
+#[derive(Debug)]
+struct MyStore(LocalFileSystem);
+
+impl std::fmt::Display for MyStore {
+ fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
+ todo!()
+ }
+}
+
+#[async_trait]
+impl ObjectStore for MyStore {
+ async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> {
+ self.0.put(path, data).await
+ }
+
+ async fn put_multipart(
+ &self,
+ _: &Path,
+ ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ todo!()
+ }
+
+ async fn abort_multipart(
+ &self,
+ _: &Path,
+ _: &MultipartId,
+ ) -> object_store::Result<()> {
+ todo!()
+ }
+
+ async fn get_opts(
+ &self,
+ location: &Path,
+ options: GetOptions,
+ ) -> object_store::Result<GetResult> {
+ self.0.get_opts(location, options).await
+ }
+
+ async fn head(&self, _: &Path) -> object_store::Result<ObjectMeta> {
+ todo!()
+ }
+
+ async fn delete(&self, _: &Path) -> object_store::Result<()> {
+ todo!()
+ }
+
+ async fn list(
+ &self,
+ _: Option<&Path>,
+ ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
+ todo!()
+ }
+
+ async fn list_with_delimiter(
+ &self,
+ _: Option<&Path>,
+ ) -> object_store::Result<ListResult> {
+ todo!()
+ }
+
+ async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> {
+ todo!()
+ }
+
+ async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> object_store::Result<()> {
+ todo!()
+ }
+}
+
+#[tokio::test]
+async fn test_get_range() {
+ let tmp = tempdir().unwrap();
+ let store = MyStore(LocalFileSystem::new_with_prefix(tmp.path()).unwrap());
+ let path = Path::from("foo");
+
+ let expected = Bytes::from_static(b"hello world");
+ store.put(&path, expected.clone()).await.unwrap();
+ let fetched = store.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(expected, fetched);
+
+ for range in [0..10, 3..5, 0..expected.len()] {
+ let data = store.get_range(&path, range.clone()).await.unwrap();
+ assert_eq!(&data[..], &expected[range])
+ }
+}