You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/15 14:29:20 UTC

[arrow-datafusion] branch master updated: Update to object_store 0.4 (#3089)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ddad472a Update to object_store 0.4 (#3089)
5ddad472a is described below

commit 5ddad472acbbe2073aa6ac901ee5115f5830cc0f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Aug 15 15:29:15 2022 +0100

    Update to object_store 0.4 (#3089)
    
    * Update to pre-release object_store
    
    * Update to object_store 0.4
---
 datafusion/common/Cargo.toml                       |  2 +-
 datafusion/core/Cargo.toml                         |  2 +-
 .../core/src/datasource/file_format/parquet.rs     | 19 ++++++++++++++++-
 .../src/physical_plan/file_format/chunked_store.rs | 18 +++++++++++++++-
 .../core/src/physical_plan/file_format/parquet.rs  | 24 ++++++++++++++++++++++
 datafusion/core/tests/path_partition.rs            | 20 +++++++++++++++++-
 6 files changed, 80 insertions(+), 5 deletions(-)

diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 35ed2bb2a..36bb2d69f 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -42,7 +42,7 @@ apache-avro = { version = "0.14", features = ["snappy"], optional = true }
 arrow = { version = "20.0.0", features = ["prettyprint"] }
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 cranelift-module = { version = "0.86.1", optional = true }
-object_store = { version = "0.3", optional = true }
+object_store = { version = "0.4", optional = true }
 ordered-float = "3.0"
 parquet = { version = "20.0.0", features = ["arrow"], optional = true }
 pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 4ef1af708..f417f9ef2 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -75,7 +75,7 @@ lazy_static = { version = "^1.4.0" }
 log = "^0.4"
 num-traits = { version = "0.2", optional = true }
 num_cpus = "1.13.0"
-object_store = "0.3.0"
+object_store = "0.4.0"
 ordered-float = "3.0"
 parking_lot = "0.12"
 parquet = { version = "20.0.0", features = ["arrow", "async"] }
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 5fe8fcba8..0db6fa634 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -579,7 +579,8 @@ mod tests {
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
-    use object_store::{GetResult, ListResult};
+    use object_store::{GetResult, ListResult, MultipartId};
+    use tokio::io::AsyncWrite;
 
     #[tokio::test]
     async fn read_merged_batches() -> Result<()> {
@@ -653,6 +654,22 @@ mod tests {
             Err(object_store::Error::NotImplemented)
         }
 
+        async fn put_multipart(
+            &self,
+            _location: &Path,
+        ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>
+        {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn abort_multipart(
+            &self,
+            _location: &Path,
+            _multipart_id: &MultipartId,
+        ) -> object_store::Result<()> {
+            Err(object_store::Error::NotImplemented)
+        }
+
         async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
             Err(object_store::Error::NotImplemented)
         }
diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/physical_plan/file_format/chunked_store.rs
index 216926b06..1a48804a2 100644
--- a/datafusion/core/src/physical_plan/file_format/chunked_store.rs
+++ b/datafusion/core/src/physical_plan/file_format/chunked_store.rs
@@ -20,11 +20,12 @@ use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::StreamExt;
 use object_store::path::Path;
-use object_store::Result;
 use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use object_store::{MultipartId, Result};
 use std::fmt::{Debug, Display, Formatter};
 use std::ops::Range;
 use std::sync::Arc;
+use tokio::io::AsyncWrite;
 
 /// Wraps a [`ObjectStore`] and makes its get response return chunks
 ///
@@ -53,6 +54,21 @@ impl ObjectStore for ChunkedStore {
         self.inner.put(location, bytes).await
     }
 
+    async fn put_multipart(
+        &self,
+        location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        self.inner.put_multipart(location).await
+    }
+
+    async fn abort_multipart(
+        &self,
+        location: &Path,
+        multipart_id: &MultipartId,
+    ) -> Result<()> {
+        self.inner.abort_multipart(location, multipart_id).await
+    }
+
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let bytes = self.inner.get(location).await?.bytes().await?;
         let mut offset = 0;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e4f62113f..0fddd984c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -431,6 +431,30 @@ impl AsyncFileReader for ParquetFileReader {
             .boxed()
     }
 
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<usize>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        let total = ranges.iter().map(|r| r.end - r.start).sum();
+        self.metrics.bytes_scanned.add(total);
+
+        async move {
+            self.store
+                .get_ranges(&self.meta.location, &ranges)
+                .await
+                .map_err(|e| {
+                    ParquetError::General(format!(
+                        "AsyncChunkReader::get_byte_ranges error: {}",
+                        e
+                    ))
+                })
+        }
+        .boxed()
+    }
+
     fn get_metadata(
         &mut self,
     ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 821d174f2..fca9b9a43 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -40,7 +40,10 @@ use datafusion::{
 use datafusion_common::ScalarValue;
 use futures::stream::BoxStream;
 use futures::{stream, StreamExt};
-use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore};
+use object_store::{
+    path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+};
+use tokio::io::AsyncWrite;
 
 #[tokio::test]
 async fn parquet_distinct_partition_col() -> Result<()> {
@@ -516,6 +519,21 @@ impl ObjectStore for MirroringObjectStore {
         unimplemented!()
     }
 
+    async fn put_multipart(
+        &self,
+        _location: &Path,
+    ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        unimplemented!()
+    }
+
+    async fn abort_multipart(
+        &self,
+        _location: &Path,
+        _multipart_id: &MultipartId,
+    ) -> object_store::Result<()> {
+        unimplemented!()
+    }
+
     async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
         self.files.iter().find(|x| *x == location.as_ref()).unwrap();
         let path = std::path::PathBuf::from(&self.mirrored_file);