You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/15 14:29:20 UTC
[arrow-datafusion] branch master updated: Update to object_store 0.4 (#3089)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5ddad472a Update to object_store 0.4 (#3089)
5ddad472a is described below
commit 5ddad472acbbe2073aa6ac901ee5115f5830cc0f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Aug 15 15:29:15 2022 +0100
Update to object_store 0.4 (#3089)
* Update to pre-release object_store
* Update to object_store 0.4
---
datafusion/common/Cargo.toml | 2 +-
datafusion/core/Cargo.toml | 2 +-
.../core/src/datasource/file_format/parquet.rs | 19 ++++++++++++++++-
.../src/physical_plan/file_format/chunked_store.rs | 18 +++++++++++++++-
.../core/src/physical_plan/file_format/parquet.rs | 24 ++++++++++++++++++++++
datafusion/core/tests/path_partition.rs | 20 +++++++++++++++++-
6 files changed, 80 insertions(+), 5 deletions(-)
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 35ed2bb2a..36bb2d69f 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -42,7 +42,7 @@ apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "20.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.86.1", optional = true }
-object_store = { version = "0.3", optional = true }
+object_store = { version = "0.4", optional = true }
ordered-float = "3.0"
parquet = { version = "20.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 4ef1af708..f417f9ef2 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -75,7 +75,7 @@ lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
-object_store = "0.3.0"
+object_store = "0.4.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "20.0.0", features = ["arrow", "async"] }
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 5fe8fcba8..0db6fa634 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -579,7 +579,8 @@ mod tests {
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
- use object_store::{GetResult, ListResult};
+ use object_store::{GetResult, ListResult, MultipartId};
+ use tokio::io::AsyncWrite;
#[tokio::test]
async fn read_merged_batches() -> Result<()> {
@@ -653,6 +654,22 @@ mod tests {
Err(object_store::Error::NotImplemented)
}
+ async fn put_multipart(
+ &self,
+ _location: &Path,
+ ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>
+ {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn abort_multipart(
+ &self,
+ _location: &Path,
+ _multipart_id: &MultipartId,
+ ) -> object_store::Result<()> {
+ Err(object_store::Error::NotImplemented)
+ }
+
async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
Err(object_store::Error::NotImplemented)
}
diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/physical_plan/file_format/chunked_store.rs
index 216926b06..1a48804a2 100644
--- a/datafusion/core/src/physical_plan/file_format/chunked_store.rs
+++ b/datafusion/core/src/physical_plan/file_format/chunked_store.rs
@@ -20,11 +20,12 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::path::Path;
-use object_store::Result;
use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use object_store::{MultipartId, Result};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;
+use tokio::io::AsyncWrite;
/// Wraps a [`ObjectStore`] and makes its get response return chunks
///
@@ -53,6 +54,21 @@ impl ObjectStore for ChunkedStore {
self.inner.put(location, bytes).await
}
+ async fn put_multipart(
+ &self,
+ location: &Path,
+ ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ self.inner.put_multipart(location).await
+ }
+
+ async fn abort_multipart(
+ &self,
+ location: &Path,
+ multipart_id: &MultipartId,
+ ) -> Result<()> {
+ self.inner.abort_multipart(location, multipart_id).await
+ }
+
async fn get(&self, location: &Path) -> Result<GetResult> {
let bytes = self.inner.get(location).await?.bytes().await?;
let mut offset = 0;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e4f62113f..0fddd984c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -431,6 +431,30 @@ impl AsyncFileReader for ParquetFileReader {
.boxed()
}
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<usize>>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+ where
+ Self: Send,
+ {
+ let total = ranges.iter().map(|r| r.end - r.start).sum();
+ self.metrics.bytes_scanned.add(total);
+
+ async move {
+ self.store
+ .get_ranges(&self.meta.location, &ranges)
+ .await
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "AsyncChunkReader::get_byte_ranges error: {}",
+ e
+ ))
+ })
+ }
+ .boxed()
+ }
+
fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 821d174f2..fca9b9a43 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -40,7 +40,10 @@ use datafusion::{
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::{stream, StreamExt};
-use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore};
+use object_store::{
+ path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+};
+use tokio::io::AsyncWrite;
#[tokio::test]
async fn parquet_distinct_partition_col() -> Result<()> {
@@ -516,6 +519,21 @@ impl ObjectStore for MirroringObjectStore {
unimplemented!()
}
+ async fn put_multipart(
+ &self,
+ _location: &Path,
+ ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ unimplemented!()
+ }
+
+ async fn abort_multipart(
+ &self,
+ _location: &Path,
+ _multipart_id: &MultipartId,
+ ) -> object_store::Result<()> {
+ unimplemented!()
+ }
+
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
self.files.iter().find(|x| *x == location.as_ref()).unwrap();
let path = std::path::PathBuf::from(&self.mirrored_file);