You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/02 17:40:05 UTC

[arrow-rs] branch master updated: Fix ObjectStore::get_range for GetResult::File (#4350) (#4351)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ec0f75aea Fix ObjectStore::get_range for GetResult::File (#4350) (#4351)
ec0f75aea is described below

commit ec0f75aeabd07d06395b70ce2e4e3573da520ba7
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Jun 2 18:39:59 2023 +0100

    Fix ObjectStore::get_range for GetResult::File (#4350) (#4351)
    
    * Fix ObjectStore::get_range for GetResult::File (#4350)
    
    * Review feedback
---
 object_store/Cargo.toml              |   4 ++
 object_store/src/lib.rs              |  14 ++++-
 object_store/src/local.rs            |   2 +-
 object_store/tests/get_range_file.rs | 116 +++++++++++++++++++++++++++++++++++
 4 files changed, 133 insertions(+), 3 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 28bf29f7f..4002a1865 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -75,3 +75,7 @@ tempfile = "3.1.0"
 futures-test = "0.3"
 rand = "0.8"
 hyper = { version = "0.14.24", features = ["server"] }
+
+[[test]]
+name = "get_range_file"
+path = "tests/get_range_file.rs"
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 98bbb7adc..864cabc4a 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -359,10 +359,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
     /// in the given byte range
     async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
         let options = GetOptions {
-            range: Some(range),
+            range: Some(range.clone()),
             ..Default::default()
         };
-        self.get_opts(location, options).await?.bytes().await
+        // Temporary until GetResult::File supports range (#4352)
+        match self.get_opts(location, options).await? {
+            GetResult::Stream(s) => collect_bytes(s, None).await,
+            #[cfg(not(target_arch = "wasm32"))]
+            GetResult::File(mut file, path) => {
+                maybe_spawn_blocking(move || local::read_range(&mut file, &path, range))
+                    .await
+            }
+            #[cfg(target_arch = "wasm32")]
+            _ => unimplemented!("File IO not implemented on wasm32."),
+        }
     }
 
     /// Return the bytes that are stored at the specified location
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 6039f8dba..ffff6a573 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -863,7 +863,7 @@ impl AsyncWrite for LocalUpload {
     }
 }
 
-fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
+pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
     let to_read = range.end - range.start;
     file.seek(SeekFrom::Start(range.start as u64))
         .context(SeekSnafu { path })?;
diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs
new file mode 100644
index 000000000..f926e3b07
--- /dev/null
+++ b/object_store/tests/get_range_file.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests the default implementation of get_range handles GetResult::File correctly (#4350)
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use object_store::local::LocalFileSystem;
+use object_store::path::Path;
+use object_store::{
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+};
+use std::fmt::Formatter;
+use tempfile::tempdir;
+use tokio::io::AsyncWrite;
+
+#[derive(Debug)]
+struct MyStore(LocalFileSystem);
+
+impl std::fmt::Display for MyStore {
+    fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
+        todo!()
+    }
+}
+
+#[async_trait]
+impl ObjectStore for MyStore {
+    async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> {
+        self.0.put(path, data).await
+    }
+
+    async fn put_multipart(
+        &self,
+        _: &Path,
+    ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        todo!()
+    }
+
+    async fn abort_multipart(
+        &self,
+        _: &Path,
+        _: &MultipartId,
+    ) -> object_store::Result<()> {
+        todo!()
+    }
+
+    async fn get_opts(
+        &self,
+        location: &Path,
+        options: GetOptions,
+    ) -> object_store::Result<GetResult> {
+        self.0.get_opts(location, options).await
+    }
+
+    async fn head(&self, _: &Path) -> object_store::Result<ObjectMeta> {
+        todo!()
+    }
+
+    async fn delete(&self, _: &Path) -> object_store::Result<()> {
+        todo!()
+    }
+
+    async fn list(
+        &self,
+        _: Option<&Path>,
+    ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
+        todo!()
+    }
+
+    async fn list_with_delimiter(
+        &self,
+        _: Option<&Path>,
+    ) -> object_store::Result<ListResult> {
+        todo!()
+    }
+
+    async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> {
+        todo!()
+    }
+
+    async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> object_store::Result<()> {
+        todo!()
+    }
+}
+
+#[tokio::test]
+async fn test_get_range() {
+    let tmp = tempdir().unwrap();
+    let store = MyStore(LocalFileSystem::new_with_prefix(tmp.path()).unwrap());
+    let path = Path::from("foo");
+
+    let expected = Bytes::from_static(b"hello world");
+    store.put(&path, expected.clone()).await.unwrap();
+    let fetched = store.get(&path).await.unwrap().bytes().await.unwrap();
+    assert_eq!(expected, fetched);
+
+    for range in [0..10, 3..5, 0..expected.len()] {
+        let data = store.get_range(&path, range.clone()).await.unwrap();
+        assert_eq!(&data[..], &expected[range])
+    }
+}