You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/09 20:35:15 UTC
[arrow-datafusion] branch master updated: Allow Overriding AsyncFileReader used by ParquetExec (#3051)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9956f80f1 Allow Overriding AsyncFileReader used by ParquetExec (#3051)
9956f80f1 is described below
commit 9956f80f197550051db7debae15d5c706afc22a3
Author: Kamil Konior <57...@users.noreply.github.com>
AuthorDate: Tue Aug 9 22:35:10 2022 +0200
Allow Overriding AsyncFileReader used by ParquetExec (#3051)
* add metadata_ext to part file, etc
* handle error
* fix compilation issues
* rename var
* fix tests compilation issues
* allow user to provide their own parquet reader factory
* move ThinFileReader into parquet module
* rename field reader to delegate
* convert if else to unwrap or else
* rename ThinFileReader to BoxedAsyncFileReader, add doc
* hide ParquetFileMetrics
* derive debug
* convert metadata_ext field into Any type
* add builder like method instead of modifying ctor
* make `ParquetFileReaderFactory` public to let user's provide custom implementations
* imports cleanup and more docs
* try fix clippy failures
* Disable where_clauses_object_safety
* add test
* extract ParquetFileReaderFactory test to integration tests
* further cleanup
* fix: Add apache RAT license
* fix send
Co-authored-by: Raphael Taylor-Davies <r....@googlemail.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/core/src/datasource/file_format/mod.rs | 1 +
.../core/src/datasource/file_format/parquet.rs | 6 +-
datafusion/core/src/datasource/listing/helpers.rs | 40 ++--
datafusion/core/src/datasource/listing/mod.rs | 6 +
datafusion/core/src/lib.rs | 2 +
.../core/src/physical_plan/file_format/avro.rs | 15 +-
.../core/src/physical_plan/file_format/csv.rs | 22 +-
.../src/physical_plan/file_format/file_stream.rs | 49 ++--
.../core/src/physical_plan/file_format/json.rs | 18 +-
.../core/src/physical_plan/file_format/mod.rs | 32 ++-
.../core/src/physical_plan/file_format/parquet.rs | 236 ++++++++++++++-----
datafusion/core/tests/custom_parquet_reader.rs | 261 +++++++++++++++++++++
12 files changed, 561 insertions(+), 127 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 8a0d5b97e..b16525c7a 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -108,6 +108,7 @@ pub(crate) mod test_util {
object_meta: meta,
partition_values: vec![],
range: None,
+ extensions: None,
}]];
let exec = format
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 04b0b9d52..5fe8fcba8 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -354,7 +354,11 @@ fn summarize_min_max(
}
}
-pub(crate) async fn fetch_parquet_metadata(
+/// Fetches parquet metadata from ObjectStore for given object
+///
+/// This component is a subject to **change** in near future and is exposed for low level integrations
+/// through [ParquetFileReaderFactory].
+pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
size_hint: Option<usize>,
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 80fece684..6c018eda3 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -183,24 +183,27 @@ pub async fn pruned_partition_list<'a>(
// Note: We might avoid parsing the partition values if they are not used in any projection,
// but the cost of parsing will likely be far dominated by the time to fetch the listing from
// the object store.
- Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
- let parsed_path = parse_partitions_for_path(
- table_path,
- &file_meta.location,
- table_partition_cols,
- )
- .map(|p| {
- p.iter()
- .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
- .collect()
- });
-
- Ok(parsed_path.map(|partition_values| PartitionedFile {
- partition_values,
- object_meta: file_meta,
- range: None,
- }))
- })))
+ Ok(Box::pin(list.try_filter_map(
+ move |object_meta| async move {
+ let parsed_path = parse_partitions_for_path(
+ table_path,
+ &object_meta.location,
+ table_partition_cols,
+ )
+ .map(|p| {
+ p.iter()
+ .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
+ .collect()
+ });
+
+ Ok(parsed_path.map(|partition_values| PartitionedFile {
+ partition_values,
+ object_meta,
+ range: None,
+ extensions: None,
+ }))
+ },
+ )))
} else {
// parse the partition values and serde them as a RecordBatch to filter them
let metas: Vec<_> = list.try_collect().await?;
@@ -317,6 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
})
.collect(),
range: None,
+ extensions: None,
})
})
})
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index 85d4b6f7d..27cf14122 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -28,6 +28,7 @@ use datafusion_common::ScalarValue;
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
+use std::sync::Arc;
pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
@@ -58,6 +59,8 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
+ /// An optional field for user defined per object metadata
+ pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
impl PartitionedFile {
@@ -71,6 +74,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: None,
+ extensions: None,
}
}
@@ -84,6 +88,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: Some(FileRange { start, end }),
+ extensions: None,
}
}
}
@@ -94,6 +99,7 @@ impl From<ObjectMeta> for PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
+ extensions: None,
}
}
}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index dcc508e65..0dd665628 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
#![warn(missing_docs, clippy::needless_borrow)]
+// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
+#![allow(where_clauses_object_safety)]
//! [DataFusion](https://github.com/apache/arrow-datafusion)
//! is an extensible query execution framework that uses
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index a795a00f9..0b7841d88 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -151,11 +151,11 @@ impl ExecutionPlan for AvroExec {
#[cfg(feature = "avro")]
mod private {
use super::*;
- use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
+ use crate::physical_plan::file_format::FileMeta;
use bytes::Buf;
use futures::StreamExt;
- use object_store::{GetResult, ObjectMeta, ObjectStore};
+ use object_store::{GetResult, ObjectStore};
pub struct AvroConfig {
pub schema: SchemaRef,
@@ -185,12 +185,11 @@ mod private {
fn open(
&self,
store: Arc<dyn ObjectStore>,
- file: ObjectMeta,
- _range: Option<FileRange>,
- ) -> FileOpenFuture {
+ file_meta: FileMeta,
+ ) -> Result<FileOpenFuture> {
let config = self.config.clone();
- Box::pin(async move {
- match store.get(&file.location).await? {
+ Ok(Box::pin(async move {
+ match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
@@ -201,7 +200,7 @@ mod private {
Ok(futures::stream::iter(reader).boxed())
}
}
- })
+ }))
}
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 46e18e27f..885bea870 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -20,21 +20,20 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
-};
-
-use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
+use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+use crate::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
-use object_store::{GetResult, ObjectMeta, ObjectStore};
+use object_store::{GetResult, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
@@ -201,12 +200,11 @@ impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
- file: ObjectMeta,
- _range: Option<FileRange>,
- ) -> FileOpenFuture {
+ file_meta: FileMeta,
+ ) -> Result<FileOpenFuture> {
let config = self.config.clone();
- Box::pin(async move {
- match store.get(&file.location).await? {
+ Ok(Box::pin(async move {
+ match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
Ok(futures::stream::iter(config.open(file, true)).boxed())
}
@@ -222,7 +220,7 @@ impl FileOpener for CsvOpener {
.boxed())
}
}
- })
+ }))
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index f095fe28d..1a54d6852 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -32,14 +32,16 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
-use object_store::{ObjectMeta, ObjectStore};
+use object_store::ObjectStore;
use datafusion_common::ScalarValue;
-use crate::datasource::listing::{FileRange, PartitionedFile};
+use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::execution::context::TaskContext;
-use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
+use crate::physical_plan::file_format::{
+ FileMeta, FileScanConfig, PartitionColumnProjector,
+};
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
@@ -53,9 +55,8 @@ pub trait FileOpener: Unpin {
fn open(
&self,
store: Arc<dyn ObjectStore>,
- file: ObjectMeta,
- range: Option<FileRange>,
- ) -> FileOpenFuture;
+ file_meta: FileMeta,
+ ) -> Result<FileOpenFuture>;
}
/// A stream that iterates record batch by record batch, file over file.
@@ -205,21 +206,30 @@ impl<F: FileOpener> FileStream<F> {
loop {
match &mut self.state {
FileStreamState::Idle => {
- let file = match self.file_iter.pop_front() {
+ let part_file = match self.file_iter.pop_front() {
Some(file) => file,
None => return Poll::Ready(None),
};
+ let file_meta = FileMeta {
+ object_meta: part_file.object_meta,
+ range: part_file.range,
+ extensions: part_file.extensions,
+ };
+
self.file_stream_metrics.time_opening.start();
- let future = self.file_reader.open(
- self.object_store.clone(),
- file.object_meta,
- file.range,
- );
-
- self.state = FileStreamState::Open {
- future,
- partition_values: file.partition_values,
+
+ match self.file_reader.open(self.object_store.clone(), file_meta) {
+ Ok(future) => {
+ self.state = FileStreamState::Open {
+ future,
+ partition_values: part_file.partition_values,
+ }
+ }
+ Err(e) => {
+ self.state = FileStreamState::Error;
+ return Poll::Ready(Some(Err(e.into())));
+ }
}
}
FileStreamState::Open {
@@ -322,12 +332,11 @@ mod tests {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
- _file: ObjectMeta,
- _range: Option<FileRange>,
- ) -> FileOpenFuture {
+ _file_meta: FileMeta,
+ ) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
- futures::future::ready(Ok(stream)).boxed()
+ Ok(futures::future::ready(Ok(stream)).boxed())
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 488de2208..10f148ad0 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -16,9 +16,6 @@
// under the License.
//! Execution plan for reading line-delimited JSON files
-use arrow::json::reader::DecoderOptions;
-
-use crate::datasource::listing::FileRange;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::execution::context::TaskContext;
@@ -27,14 +24,16 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
+use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
+use arrow::json::reader::DecoderOptions;
use arrow::{datatypes::SchemaRef, json};
use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
-use object_store::{GetResult, ObjectMeta, ObjectStore};
+use object_store::{GetResult, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
@@ -163,13 +162,12 @@ impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
- file: ObjectMeta,
- _range: Option<FileRange>,
- ) -> FileOpenFuture {
+ file_meta: FileMeta,
+ ) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
- Box::pin(async move {
- match store.get(&file.location).await? {
+ Ok(Box::pin(async move {
+ match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = json::Reader::new(file, schema.clone(), options);
Ok(futures::stream::iter(reader).boxed())
@@ -188,7 +186,7 @@ impl FileOpener for JsonOpener {
.boxed())
}
}
- })
+ }))
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index d7d70cd4b..31fa1f2d8 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -29,7 +29,7 @@ mod parquet;
pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
-pub use self::parquet::ParquetExec;
+pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
use arrow::{
array::{ArrayData, ArrayRef, DictionaryArray},
buffer::Buffer,
@@ -41,6 +41,7 @@ pub use avro::AvroExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;
+use crate::datasource::listing::FileRange;
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
use crate::{
error::{DataFusionError, Result},
@@ -50,6 +51,8 @@ use arrow::array::{new_null_array, UInt16BufferBuilder};
use arrow::record_batch::RecordBatchOptions;
use lazy_static::lazy_static;
use log::info;
+use object_store::path::Path;
+use object_store::ObjectMeta;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
@@ -401,6 +404,33 @@ fn create_dict_array(
))
}
+/// A single file or part of a file that should be read, along with its schema, statistics
+pub struct FileMeta {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub object_meta: ObjectMeta,
+ /// An optional file range for a more fine-grained parallel execution
+ pub range: Option<FileRange>,
+ /// An optional field for user defined per object metadata
+ pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+}
+
+impl FileMeta {
+ /// The full path to the object
+ pub fn location(&self) -> &Path {
+ &self.object_meta.location
+ }
+}
+
+impl From<ObjectMeta> for FileMeta {
+ fn from(object_meta: ObjectMeta) -> Self {
+ Self {
+ object_meta,
+ range: None,
+ extensions: None,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::{
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index bd5528c09..e4f62113f 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -24,6 +24,25 @@ use std::ops::Range;
use std::sync::Arc;
use std::{any::Any, convert::TryInto};
+use crate::datasource::file_format::parquet::fetch_parquet_metadata;
+use crate::datasource::listing::FileRange;
+use crate::physical_plan::file_format::file_stream::{
+ FileOpenFuture, FileOpener, FileStream,
+};
+use crate::physical_plan::file_format::FileMeta;
+use crate::{
+ error::{DataFusionError, Result},
+ execution::context::{SessionState, TaskContext},
+ physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+ physical_plan::{
+ expressions::PhysicalSortExpr,
+ file_format::{FileScanConfig, SchemaAdapter},
+ metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ Statistics,
+ },
+ scalar::ScalarValue,
+};
use arrow::datatypes::DataType;
use arrow::{
array::ArrayRef,
@@ -31,6 +50,8 @@ use arrow::{
error::ArrowError,
};
use bytes::Bytes;
+use datafusion_common::Column;
+use datafusion_expr::Expr;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use log::debug;
@@ -46,28 +67,6 @@ use parquet::file::{
};
use parquet::schema::types::ColumnDescriptor;
-use datafusion_common::Column;
-use datafusion_expr::Expr;
-
-use crate::datasource::file_format::parquet::fetch_parquet_metadata;
-use crate::datasource::listing::FileRange;
-use crate::physical_plan::file_format::file_stream::{
- FileOpenFuture, FileOpener, FileStream,
-};
-use crate::{
- error::{DataFusionError, Result},
- execution::context::{SessionState, TaskContext},
- physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
- physical_plan::{
- expressions::PhysicalSortExpr,
- file_format::{FileScanConfig, SchemaAdapter},
- metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
- },
- scalar::ScalarValue,
-};
-
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
@@ -80,17 +79,8 @@ pub struct ParquetExec {
pruning_predicate: Option<PruningPredicate>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
-}
-
-/// Stores metrics about the parquet execution for a particular parquet file
-#[derive(Debug, Clone)]
-struct ParquetFileMetrics {
- /// Number of times the predicate could not be evaluated
- pub predicate_evaluation_errors: metrics::Count,
- /// Number of row groups pruned using
- pub row_groups_pruned: metrics::Count,
- /// Total number of bytes scanned
- pub bytes_scanned: metrics::Count,
+ /// Optional user defined parquet file reader factory
+ parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
}
impl ParquetExec {
@@ -130,6 +120,7 @@ impl ParquetExec {
metrics,
pruning_predicate,
metadata_size_hint,
+ parquet_file_reader_factory: None,
}
}
@@ -142,6 +133,35 @@ impl ParquetExec {
pub fn pruning_predicate(&self) -> Option<&PruningPredicate> {
self.pruning_predicate.as_ref()
}
+
+ /// Optional user defined parquet file reader factory.
+ ///
+ /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
+ /// implementation for data access operations.
+ ///
+ /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
+ /// to this factory instead of `ObjectStore`.
+ pub fn with_parquet_file_reader_factory(
+ mut self,
+ parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+ ) -> Self {
+ self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
+ self
+ }
+}
+
+/// Stores metrics about the parquet execution for a particular parquet file.
+///
+/// This component is a subject to **change** in near future and is exposed for low level integrations
+/// through [ParquetFileReaderFactory].
+#[derive(Debug, Clone)]
+pub struct ParquetFileMetrics {
+ /// Number of times the predicate could not be evaluated
+ pub predicate_evaluation_errors: metrics::Count,
+ /// Number of row groups pruned using
+ pub row_groups_pruned: metrics::Count,
+ /// Total number of bytes scanned
+ pub bytes_scanned: metrics::Count,
}
impl ParquetFileMetrics {
@@ -209,27 +229,41 @@ impl ExecutionPlan for ParquetExec {
fn execute(
&self,
partition_index: usize,
- context: Arc<TaskContext>,
+ ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let projection = match self.base_config.file_column_projection_indices() {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
+ let parquet_file_reader_factory = self
+ .parquet_file_reader_factory
+ .as_ref()
+ .map(|f| Ok(Arc::clone(f)))
+ .unwrap_or_else(|| {
+ ctx.runtime_env()
+ .object_store(&self.base_config.object_store_url)
+ .map(|store| {
+ Arc::new(DefaultParquetFileReaderFactory::new(store))
+ as Arc<dyn ParquetFileReaderFactory>
+ })
+ })?;
+
let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
- batch_size: context.session_config().batch_size(),
+ batch_size: ctx.session_config().batch_size(),
pruning_predicate: self.pruning_predicate.clone(),
table_schema: self.base_config.file_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
+ parquet_file_reader_factory,
};
let stream = FileStream::new(
&self.base_config,
partition_index,
- context,
+ ctx,
opener,
self.metrics.clone(),
)?;
@@ -284,34 +318,37 @@ struct ParquetOpener {
table_schema: SchemaRef,
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
+ parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
}
impl FileOpener for ParquetOpener {
fn open(
&self,
- store: Arc<dyn ObjectStore>,
- meta: ObjectMeta,
- range: Option<FileRange>,
- ) -> FileOpenFuture {
+ _: Arc<dyn ObjectStore>,
+ file_meta: FileMeta,
+ ) -> Result<FileOpenFuture> {
+ let file_range = file_meta.range.clone();
+
let metrics = ParquetFileMetrics::new(
self.partition_index,
- meta.location.as_ref(),
+ file_meta.location().as_ref(),
&self.metrics,
);
- let reader = ParquetFileReader {
- store,
- meta,
- metadata_size_hint: self.metadata_size_hint,
- metrics: metrics.clone(),
- };
+ let reader =
+ BoxedAsyncFileReader(self.parquet_file_reader_factory.create_reader(
+ self.partition_index,
+ file_meta,
+ self.metadata_size_hint,
+ &self.metrics,
+ )?);
let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
let batch_size = self.batch_size;
let projection = self.projection.clone();
let pruning_predicate = self.pruning_predicate.clone();
- Box::pin(async move {
+ Ok(Box::pin(async move {
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;
@@ -322,7 +359,8 @@ impl FileOpener for ParquetOpener {
);
let groups = builder.metadata().row_groups();
- let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics);
+ let row_groups =
+ prune_row_groups(groups, file_range, pruning_predicate, &metrics);
let stream = builder
.with_projection(mask)
@@ -341,7 +379,32 @@ impl FileOpener for ParquetOpener {
});
Ok(adapted.boxed())
- })
+ }))
+ }
+}
+
+/// Factory of parquet file readers.
+///
+/// Provides means to implement custom data access interface.
+pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
+ /// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Result<Box<dyn AsyncFileReader + Send>>;
+}
+
+#[derive(Debug)]
+pub struct DefaultParquetFileReaderFactory {
+ store: Arc<dyn ObjectStore>,
+}
+
+impl DefaultParquetFileReaderFactory {
+ pub fn new(store: Arc<dyn ObjectStore>) -> Self {
+ Self { store }
}
}
@@ -349,8 +412,8 @@ impl FileOpener for ParquetOpener {
struct ParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
- metadata_size_hint: Option<usize>,
metrics: ParquetFileMetrics,
+ metadata_size_hint: Option<usize>,
}
impl AsyncFileReader for ParquetFileReader {
@@ -389,6 +452,63 @@ impl AsyncFileReader for ParquetFileReader {
}
}
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Result<Box<dyn AsyncFileReader + Send>> {
+ let parquet_file_metrics = ParquetFileMetrics::new(
+ partition_index,
+ file_meta.location().as_ref(),
+ metrics,
+ );
+
+ Ok(Box::new(ParquetFileReader {
+ meta: file_meta.object_meta,
+ store: Arc::clone(&self.store),
+ metadata_size_hint,
+ metrics: parquet_file_metrics,
+ }))
+ }
+}
+
+///
+/// BoxedAsyncFileReader has been created to satisfy type requirements of
+/// parquet stream builder constructor.
+///
+/// Temporary pending https://github.com/apache/arrow-rs/pull/2368
+struct BoxedAsyncFileReader(Box<dyn AsyncFileReader + Send>);
+
+impl AsyncFileReader for BoxedAsyncFileReader {
+ fn get_bytes(
+ &mut self,
+ range: Range<usize>,
+ ) -> BoxFuture<'_, ::parquet::errors::Result<Bytes>> {
+ self.0.get_bytes(range)
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<usize>>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+ // TODO: This where bound forces us to enable #![allow(where_clauses_object_safety)] (#3081)
+ // Upstream issue https://github.com/apache/arrow-rs/issues/2372
+ where
+ Self: Send,
+ {
+ self.0.get_byte_ranges(ranges)
+ }
+
+ fn get_metadata(
+ &mut self,
+ ) -> BoxFuture<'_, ::parquet::errors::Result<Arc<ParquetMetaData>>> {
+ self.0.get_metadata()
+ }
+}
+
/// Wraps parquet statistics in a way
/// that implements [`PruningStatistics`]
struct RowGroupPruningStatistics<'a> {
@@ -652,12 +772,6 @@ pub async fn plan_to_parquet(
#[cfg(test)]
mod tests {
- use crate::{
- assert_batches_sorted_eq, assert_contains,
- datasource::file_format::{parquet::ParquetFormat, FileFormat},
- physical_plan::collect,
- };
-
use super::*;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
@@ -666,6 +780,11 @@ mod tests {
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
+ use crate::{
+ assert_batches_sorted_eq, assert_contains,
+ datasource::file_format::{parquet::ParquetFormat, FileFormat},
+ physical_plan::collect,
+ };
use arrow::array::Float32Array;
use arrow::record_batch::RecordBatch;
use arrow::{
@@ -1086,6 +1205,7 @@ mod tests {
object_meta: meta.clone(),
partition_values: vec![],
range: Some(FileRange { start, end }),
+ extensions: None,
}
}
@@ -1188,6 +1308,7 @@ mod tests {
ScalarValue::Utf8(Some("26".to_owned())),
],
range: None,
+ extensions: None,
};
let parquet_exec = ParquetExec::new(
@@ -1250,6 +1371,7 @@ mod tests {
},
partition_values: vec![],
range: None,
+ extensions: None,
};
let parquet_exec = ParquetExec::new(
diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs
new file mode 100644
index 000000000..ac8c98381
--- /dev/null
+++ b/datafusion/core/tests/custom_parquet_reader.rs
@@ -0,0 +1,261 @@
+// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
+#![allow(where_clauses_object_safety)]
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod tests {
+ use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray};
+ use arrow::datatypes::{Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use bytes::Bytes;
+ use datafusion::assert_batches_sorted_eq;
+ use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
+ use datafusion::datasource::listing::PartitionedFile;
+ use datafusion::datasource::object_store::ObjectStoreUrl;
+ use datafusion::physical_plan::file_format::{
+ FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics,
+ ParquetFileReaderFactory,
+ };
+ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+ use datafusion::physical_plan::{collect, Statistics};
+ use datafusion::prelude::SessionContext;
+ use datafusion_common::DataFusionError;
+ use futures::future::BoxFuture;
+ use futures::{FutureExt, TryFutureExt};
+ use object_store::memory::InMemory;
+ use object_store::path::Path;
+ use object_store::{ObjectMeta, ObjectStore};
+ use parquet::arrow::async_reader::AsyncFileReader;
+ use parquet::arrow::ArrowWriter;
+ use parquet::errors::ParquetError;
+ use parquet::file::metadata::ParquetMetaData;
+ use std::io::Cursor;
+ use std::ops::Range;
+ use std::sync::Arc;
+ use std::time::SystemTime;
+
+ const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";
+
+ #[tokio::test]
+ async fn route_data_access_ops_to_parquet_file_reader_factory() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+ let batch = create_batch(vec![
+ ("c1", c1.clone()),
+ ("c2", c2.clone()),
+ ("c3", c3.clone()),
+ ]);
+
+ let file_schema = batch.schema().clone();
+ let (in_memory_object_store, parquet_files_meta) =
+ store_parquet_in_memory(vec![batch]).await;
+ let file_groups = parquet_files_meta
+ .into_iter()
+ .map(|meta| PartitionedFile {
+ object_meta: meta,
+ partition_values: vec![],
+ range: None,
+ extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
+ })
+ .collect();
+
+ // prepare the scan
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ // just any url that doesn't point to in memory object store
+ object_store_url: ObjectStoreUrl::local_filesystem(),
+ file_groups: vec![file_groups],
+ file_schema,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ None,
+ None,
+ )
+ .with_parquet_file_reader_factory(Arc::new(
+ InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)),
+ ));
+
+ let session_ctx = SessionContext::new();
+
+ let task_ctx = session_ctx.task_ctx();
+ let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+ let expected = vec![
+ "+-----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+-----+----+----+",
+ "| Foo | 1 | 10 |",
+ "| | 2 | 20 |",
+ "| bar | | |",
+ "+-----+----+----+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
+ #[derive(Debug)]
+ struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
+
+ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> {
+ let metadata = file_meta
+ .extensions
+ .as_ref()
+ .expect("has user defined metadata");
+ let metadata = metadata
+ .downcast_ref::<String>()
+ .expect("has string metadata");
+
+ assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);
+
+ let parquet_file_metrics = ParquetFileMetrics::new(
+ partition_index,
+ file_meta.location().as_ref(),
+ metrics,
+ );
+
+ Ok(Box::new(ParquetFileReader {
+ store: Arc::clone(&self.0),
+ meta: file_meta.object_meta,
+ metrics: parquet_file_metrics,
+ metadata_size_hint,
+ }))
+ }
+ }
+
+ fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+ columns.into_iter().fold(
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+ |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
+ )
+ }
+
+ fn add_to_batch(
+ batch: &RecordBatch,
+ field_name: &str,
+ array: ArrayRef,
+ ) -> RecordBatch {
+ let mut fields = batch.schema().fields().clone();
+ fields.push(Field::new(field_name, array.data_type().clone(), true));
+ let schema = Arc::new(Schema::new(fields));
+
+ let mut columns = batch.columns().to_vec();
+ columns.push(array);
+ RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+ }
+
+ async fn store_parquet_in_memory(
+ batches: Vec<RecordBatch>,
+ ) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
+ let in_memory = InMemory::new();
+
+ let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
+ .into_iter()
+ .enumerate()
+ .map(|(offset, batch)| {
+ let mut buf = Vec::<u8>::with_capacity(32 * 1024);
+ let mut output = Cursor::new(&mut buf);
+
+ let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
+ .expect("creating writer");
+
+ writer.write(&batch).expect("Writing batch");
+ writer.close().unwrap();
+
+ let meta = ObjectMeta {
+ location: Path::parse(format!("file-{offset}.parquet"))
+ .expect("creating path"),
+ last_modified: chrono::DateTime::from(SystemTime::now()),
+ size: buf.len(),
+ };
+
+ (meta, Bytes::from(buf))
+ })
+ .collect();
+
+ let mut objects = Vec::with_capacity(parquet_batches.len());
+ for (meta, bytes) in parquet_batches {
+ in_memory
+ .put(&meta.location, bytes)
+ .await
+ .expect("put parquet file into in memory object store");
+ objects.push(meta);
+ }
+
+ (Arc::new(in_memory), objects)
+ }
+
+ /// Implements [`AsyncFileReader`] for a parquet file in object storage
+ struct ParquetFileReader {
+ store: Arc<dyn ObjectStore>,
+ meta: ObjectMeta,
+ metrics: ParquetFileMetrics,
+ metadata_size_hint: Option<usize>,
+ }
+
+ impl AsyncFileReader for ParquetFileReader {
+ fn get_bytes(
+ &mut self,
+ range: Range<usize>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+ self.metrics.bytes_scanned.add(range.end - range.start);
+
+ self.store
+ .get_range(&self.meta.location, range)
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "AsyncChunkReader::get_bytes error: {}",
+ e
+ ))
+ })
+ .boxed()
+ }
+
+ fn get_metadata(
+ &mut self,
+ ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ Box::pin(async move {
+ let metadata = fetch_parquet_metadata(
+ self.store.as_ref(),
+ &self.meta,
+ self.metadata_size_hint,
+ )
+ .await
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "AsyncChunkReader::get_metadata error: {}",
+ e
+ ))
+ })?;
+ Ok(Arc::new(metadata))
+ })
+ }
+ }
+}