You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/09 20:35:15 UTC

[arrow-datafusion] branch master updated: Allow Overriding AsyncFileReader used by ParquetExec (#3051)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9956f80f1 Allow Overriding AsyncFileReader used by ParquetExec (#3051)
9956f80f1 is described below

commit 9956f80f197550051db7debae15d5c706afc22a3
Author: Kamil Konior <57...@users.noreply.github.com>
AuthorDate: Tue Aug 9 22:35:10 2022 +0200

    Allow Overriding AsyncFileReader used by ParquetExec (#3051)
    
    * add metadata_ext to part file, etc
    
    * handle error
    
    * fix compilation issues
    
    * rename var
    
    * fix tests compilation issues
    
    * allow user to provide their own parquet reader factory
    
    * move ThinFileReader into parquet module
    
    * rename field reader to delegate
    
    * convert if else to unwrap or else
    
    * rename ThinFileReader to BoxedAsyncFileReader, add doc
    
    * hide ParquetFileMetrics
    
    * derive debug
    
    * convert metadata_ext field into Any type
    
    * add builder like method instead of modifying ctor
    
    * make `ParquetFileReaderFactory` public to let user's provide custom implementations
    
    * imports cleanup and more docs
    
    * try fix clippy failures
    
    * Disable where_clauses_object_safety
    
    * add test
    
    * extract ParquetFileReaderFactory test to integration tests
    
    * further cleanup
    
    * fix: Add apache RAT license
    
    * fix send
    
    Co-authored-by: Raphael Taylor-Davies <r....@googlemail.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 .../core/src/datasource/file_format/parquet.rs     |   6 +-
 datafusion/core/src/datasource/listing/helpers.rs  |  40 ++--
 datafusion/core/src/datasource/listing/mod.rs      |   6 +
 datafusion/core/src/lib.rs                         |   2 +
 .../core/src/physical_plan/file_format/avro.rs     |  15 +-
 .../core/src/physical_plan/file_format/csv.rs      |  22 +-
 .../src/physical_plan/file_format/file_stream.rs   |  49 ++--
 .../core/src/physical_plan/file_format/json.rs     |  18 +-
 .../core/src/physical_plan/file_format/mod.rs      |  32 ++-
 .../core/src/physical_plan/file_format/parquet.rs  | 236 ++++++++++++++-----
 datafusion/core/tests/custom_parquet_reader.rs     | 261 +++++++++++++++++++++
 12 files changed, 561 insertions(+), 127 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 8a0d5b97e..b16525c7a 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -108,6 +108,7 @@ pub(crate) mod test_util {
             object_meta: meta,
             partition_values: vec![],
             range: None,
+            extensions: None,
         }]];
 
         let exec = format
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 04b0b9d52..5fe8fcba8 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -354,7 +354,11 @@ fn summarize_min_max(
     }
 }
 
-pub(crate) async fn fetch_parquet_metadata(
+/// Fetches parquet metadata from ObjectStore for given object
+///
+/// This component is a subject to **change** in near future and is exposed for low level integrations
+/// through [ParquetFileReaderFactory].
+pub async fn fetch_parquet_metadata(
     store: &dyn ObjectStore,
     meta: &ObjectMeta,
     size_hint: Option<usize>,
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 80fece684..6c018eda3 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -183,24 +183,27 @@ pub async fn pruned_partition_list<'a>(
         // Note: We might avoid parsing the partition values if they are not used in any projection,
         // but the cost of parsing will likely be far dominated by the time to fetch the listing from
         // the object store.
-        Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
-            let parsed_path = parse_partitions_for_path(
-                table_path,
-                &file_meta.location,
-                table_partition_cols,
-            )
-            .map(|p| {
-                p.iter()
-                    .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
-                    .collect()
-            });
-
-            Ok(parsed_path.map(|partition_values| PartitionedFile {
-                partition_values,
-                object_meta: file_meta,
-                range: None,
-            }))
-        })))
+        Ok(Box::pin(list.try_filter_map(
+            move |object_meta| async move {
+                let parsed_path = parse_partitions_for_path(
+                    table_path,
+                    &object_meta.location,
+                    table_partition_cols,
+                )
+                .map(|p| {
+                    p.iter()
+                        .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
+                        .collect()
+                });
+
+                Ok(parsed_path.map(|partition_values| PartitionedFile {
+                    partition_values,
+                    object_meta,
+                    range: None,
+                    extensions: None,
+                }))
+            },
+        )))
     } else {
         // parse the partition values and serde them as a RecordBatch to filter them
         let metas: Vec<_> = list.try_collect().await?;
@@ -317,6 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
                         })
                         .collect(),
                     range: None,
+                    extensions: None,
                 })
             })
         })
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index 85d4b6f7d..27cf14122 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -28,6 +28,7 @@ use datafusion_common::ScalarValue;
 use futures::Stream;
 use object_store::{path::Path, ObjectMeta};
 use std::pin::Pin;
+use std::sync::Arc;
 
 pub use self::url::ListingTableUrl;
 pub use table::{ListingOptions, ListingTable, ListingTableConfig};
@@ -58,6 +59,8 @@ pub struct PartitionedFile {
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
+    /// An optional field for user defined per object metadata  
+    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
 }
 
 impl PartitionedFile {
@@ -71,6 +74,7 @@ impl PartitionedFile {
             },
             partition_values: vec![],
             range: None,
+            extensions: None,
         }
     }
 
@@ -84,6 +88,7 @@ impl PartitionedFile {
             },
             partition_values: vec![],
             range: Some(FileRange { start, end }),
+            extensions: None,
         }
     }
 }
@@ -94,6 +99,7 @@ impl From<ObjectMeta> for PartitionedFile {
             object_meta,
             partition_values: vec![],
             range: None,
+            extensions: None,
         }
     }
 }
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index dcc508e65..0dd665628 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 #![warn(missing_docs, clippy::needless_borrow)]
+// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
+#![allow(where_clauses_object_safety)]
 
 //! [DataFusion](https://github.com/apache/arrow-datafusion)
 //! is an extensible query execution framework that uses
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index a795a00f9..0b7841d88 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -151,11 +151,11 @@ impl ExecutionPlan for AvroExec {
 #[cfg(feature = "avro")]
 mod private {
     use super::*;
-    use crate::datasource::listing::FileRange;
     use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
+    use crate::physical_plan::file_format::FileMeta;
     use bytes::Buf;
     use futures::StreamExt;
-    use object_store::{GetResult, ObjectMeta, ObjectStore};
+    use object_store::{GetResult, ObjectStore};
 
     pub struct AvroConfig {
         pub schema: SchemaRef,
@@ -185,12 +185,11 @@ mod private {
         fn open(
             &self,
             store: Arc<dyn ObjectStore>,
-            file: ObjectMeta,
-            _range: Option<FileRange>,
-        ) -> FileOpenFuture {
+            file_meta: FileMeta,
+        ) -> Result<FileOpenFuture> {
             let config = self.config.clone();
-            Box::pin(async move {
-                match store.get(&file.location).await? {
+            Ok(Box::pin(async move {
+                match store.get(file_meta.location()).await? {
                     GetResult::File(file, _) => {
                         let reader = config.open(file)?;
                         Ok(futures::stream::iter(reader).boxed())
@@ -201,7 +200,7 @@ mod private {
                         Ok(futures::stream::iter(reader).boxed())
                     }
                 }
-            })
+            }))
         }
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 46e18e27f..885bea870 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -20,21 +20,20 @@
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::{SessionState, TaskContext};
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
-};
-
-use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
+use crate::physical_plan::file_format::FileMeta;
 use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+use crate::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+};
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
 use bytes::Buf;
 use futures::{StreamExt, TryStreamExt};
-use object_store::{GetResult, ObjectMeta, ObjectStore};
+use object_store::{GetResult, ObjectStore};
 use std::any::Any;
 use std::fs;
 use std::path::Path;
@@ -201,12 +200,11 @@ impl FileOpener for CsvOpener {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
-        file: ObjectMeta,
-        _range: Option<FileRange>,
-    ) -> FileOpenFuture {
+        file_meta: FileMeta,
+    ) -> Result<FileOpenFuture> {
         let config = self.config.clone();
-        Box::pin(async move {
-            match store.get(&file.location).await? {
+        Ok(Box::pin(async move {
+            match store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
                     Ok(futures::stream::iter(config.open(file, true)).boxed())
                 }
@@ -222,7 +220,7 @@ impl FileOpener for CsvOpener {
                         .boxed())
                 }
             }
-        })
+        }))
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index f095fe28d..1a54d6852 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -32,14 +32,16 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
 use futures::future::BoxFuture;
 use futures::stream::BoxStream;
 use futures::{ready, FutureExt, Stream, StreamExt};
-use object_store::{ObjectMeta, ObjectStore};
+use object_store::ObjectStore;
 
 use datafusion_common::ScalarValue;
 
-use crate::datasource::listing::{FileRange, PartitionedFile};
+use crate::datasource::listing::PartitionedFile;
 use crate::error::Result;
 use crate::execution::context::TaskContext;
-use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
+use crate::physical_plan::file_format::{
+    FileMeta, FileScanConfig, PartitionColumnProjector,
+};
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
 };
@@ -53,9 +55,8 @@ pub trait FileOpener: Unpin {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
-        file: ObjectMeta,
-        range: Option<FileRange>,
-    ) -> FileOpenFuture;
+        file_meta: FileMeta,
+    ) -> Result<FileOpenFuture>;
 }
 
 /// A stream that iterates record batch by record batch, file over file.
@@ -205,21 +206,30 @@ impl<F: FileOpener> FileStream<F> {
         loop {
             match &mut self.state {
                 FileStreamState::Idle => {
-                    let file = match self.file_iter.pop_front() {
+                    let part_file = match self.file_iter.pop_front() {
                         Some(file) => file,
                         None => return Poll::Ready(None),
                     };
 
+                    let file_meta = FileMeta {
+                        object_meta: part_file.object_meta,
+                        range: part_file.range,
+                        extensions: part_file.extensions,
+                    };
+
                     self.file_stream_metrics.time_opening.start();
-                    let future = self.file_reader.open(
-                        self.object_store.clone(),
-                        file.object_meta,
-                        file.range,
-                    );
-
-                    self.state = FileStreamState::Open {
-                        future,
-                        partition_values: file.partition_values,
+
+                    match self.file_reader.open(self.object_store.clone(), file_meta) {
+                        Ok(future) => {
+                            self.state = FileStreamState::Open {
+                                future,
+                                partition_values: part_file.partition_values,
+                            }
+                        }
+                        Err(e) => {
+                            self.state = FileStreamState::Error;
+                            return Poll::Ready(Some(Err(e.into())));
+                        }
                     }
                 }
                 FileStreamState::Open {
@@ -322,12 +332,11 @@ mod tests {
         fn open(
             &self,
             _store: Arc<dyn ObjectStore>,
-            _file: ObjectMeta,
-            _range: Option<FileRange>,
-        ) -> FileOpenFuture {
+            _file_meta: FileMeta,
+        ) -> Result<FileOpenFuture> {
             let iterator = self.records.clone().into_iter().map(Ok);
             let stream = futures::stream::iter(iterator).boxed();
-            futures::future::ready(Ok(stream)).boxed()
+            Ok(futures::future::ready(Ok(stream)).boxed())
         }
     }
 
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 488de2208..10f148ad0 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -16,9 +16,6 @@
 // under the License.
 
 //! Execution plan for reading line-delimited JSON files
-use arrow::json::reader::DecoderOptions;
-
-use crate::datasource::listing::FileRange;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::execution::context::TaskContext;
@@ -27,14 +24,16 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
+use crate::physical_plan::file_format::FileMeta;
 use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
+use arrow::json::reader::DecoderOptions;
 use arrow::{datatypes::SchemaRef, json};
 use bytes::Buf;
 use futures::{StreamExt, TryStreamExt};
-use object_store::{GetResult, ObjectMeta, ObjectStore};
+use object_store::{GetResult, ObjectStore};
 use std::any::Any;
 use std::fs;
 use std::path::Path;
@@ -163,13 +162,12 @@ impl FileOpener for JsonOpener {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
-        file: ObjectMeta,
-        _range: Option<FileRange>,
-    ) -> FileOpenFuture {
+        file_meta: FileMeta,
+    ) -> Result<FileOpenFuture> {
         let options = self.options.clone();
         let schema = self.file_schema.clone();
-        Box::pin(async move {
-            match store.get(&file.location).await? {
+        Ok(Box::pin(async move {
+            match store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
                     let reader = json::Reader::new(file, schema.clone(), options);
                     Ok(futures::stream::iter(reader).boxed())
@@ -188,7 +186,7 @@ impl FileOpener for JsonOpener {
                         .boxed())
                 }
             }
-        })
+        }))
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index d7d70cd4b..31fa1f2d8 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -29,7 +29,7 @@ mod parquet;
 pub(crate) use self::csv::plan_to_csv;
 pub use self::csv::CsvExec;
 pub(crate) use self::parquet::plan_to_parquet;
-pub use self::parquet::ParquetExec;
+pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
 use arrow::{
     array::{ArrayData, ArrayRef, DictionaryArray},
     buffer::Buffer,
@@ -41,6 +41,7 @@ pub use avro::AvroExec;
 pub(crate) use json::plan_to_json;
 pub use json::NdJsonExec;
 
+use crate::datasource::listing::FileRange;
 use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
 use crate::{
     error::{DataFusionError, Result},
@@ -50,6 +51,8 @@ use arrow::array::{new_null_array, UInt16BufferBuilder};
 use arrow::record_batch::RecordBatchOptions;
 use lazy_static::lazy_static;
 use log::info;
+use object_store::path::Path;
+use object_store::ObjectMeta;
 use std::{
     collections::HashMap,
     fmt::{Display, Formatter, Result as FmtResult},
@@ -401,6 +404,33 @@ fn create_dict_array(
     ))
 }
 
+/// A single file or part of a file that should be read, along with its schema, statistics
+pub struct FileMeta {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub object_meta: ObjectMeta,
+    /// An optional file range for a more fine-grained parallel execution
+    pub range: Option<FileRange>,
+    /// An optional field for user defined per object metadata  
+    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+}
+
+impl FileMeta {
+    /// The full path to the object
+    pub fn location(&self) -> &Path {
+        &self.object_meta.location
+    }
+}
+
+impl From<ObjectMeta> for FileMeta {
+    fn from(object_meta: ObjectMeta) -> Self {
+        Self {
+            object_meta,
+            range: None,
+            extensions: None,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::{
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index bd5528c09..e4f62113f 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -24,6 +24,25 @@ use std::ops::Range;
 use std::sync::Arc;
 use std::{any::Any, convert::TryInto};
 
+use crate::datasource::file_format::parquet::fetch_parquet_metadata;
+use crate::datasource::listing::FileRange;
+use crate::physical_plan::file_format::file_stream::{
+    FileOpenFuture, FileOpener, FileStream,
+};
+use crate::physical_plan::file_format::FileMeta;
+use crate::{
+    error::{DataFusionError, Result},
+    execution::context::{SessionState, TaskContext},
+    physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+    physical_plan::{
+        expressions::PhysicalSortExpr,
+        file_format::{FileScanConfig, SchemaAdapter},
+        metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+        DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        Statistics,
+    },
+    scalar::ScalarValue,
+};
 use arrow::datatypes::DataType;
 use arrow::{
     array::ArrayRef,
@@ -31,6 +50,8 @@ use arrow::{
     error::ArrowError,
 };
 use bytes::Bytes;
+use datafusion_common::Column;
+use datafusion_expr::Expr;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use log::debug;
@@ -46,28 +67,6 @@ use parquet::file::{
 };
 use parquet::schema::types::ColumnDescriptor;
 
-use datafusion_common::Column;
-use datafusion_expr::Expr;
-
-use crate::datasource::file_format::parquet::fetch_parquet_metadata;
-use crate::datasource::listing::FileRange;
-use crate::physical_plan::file_format::file_stream::{
-    FileOpenFuture, FileOpener, FileStream,
-};
-use crate::{
-    error::{DataFusionError, Result},
-    execution::context::{SessionState, TaskContext},
-    physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
-    physical_plan::{
-        expressions::PhysicalSortExpr,
-        file_format::{FileScanConfig, SchemaAdapter},
-        metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
-    },
-    scalar::ScalarValue,
-};
-
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -80,17 +79,8 @@ pub struct ParquetExec {
     pruning_predicate: Option<PruningPredicate>,
     /// Optional hint for the size of the parquet metadata
     metadata_size_hint: Option<usize>,
-}
-
-/// Stores metrics about the parquet execution for a particular parquet file
-#[derive(Debug, Clone)]
-struct ParquetFileMetrics {
-    /// Number of times the predicate could not be evaluated
-    pub predicate_evaluation_errors: metrics::Count,
-    /// Number of row groups pruned using
-    pub row_groups_pruned: metrics::Count,
-    /// Total number of bytes scanned
-    pub bytes_scanned: metrics::Count,
+    /// Optional user defined parquet file reader factory
+    parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
 }
 
 impl ParquetExec {
@@ -130,6 +120,7 @@ impl ParquetExec {
             metrics,
             pruning_predicate,
             metadata_size_hint,
+            parquet_file_reader_factory: None,
         }
     }
 
@@ -142,6 +133,35 @@ impl ParquetExec {
     pub fn pruning_predicate(&self) -> Option<&PruningPredicate> {
         self.pruning_predicate.as_ref()
     }
+
+    /// Optional user defined parquet file reader factory.
+    ///
+    /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
+    /// implementation for data access operations.
+    ///
+    /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
+    /// to this factory instead of `ObjectStore`.
+    pub fn with_parquet_file_reader_factory(
+        mut self,
+        parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+    ) -> Self {
+        self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
+        self
+    }
+}
+
+/// Stores metrics about the parquet execution for a particular parquet file.
+///
+/// This component is a subject to **change** in near future and is exposed for low level integrations
+/// through [ParquetFileReaderFactory].
+#[derive(Debug, Clone)]
+pub struct ParquetFileMetrics {
+    /// Number of times the predicate could not be evaluated
+    pub predicate_evaluation_errors: metrics::Count,
+    /// Number of row groups pruned using
+    pub row_groups_pruned: metrics::Count,
+    /// Total number of bytes scanned
+    pub bytes_scanned: metrics::Count,
 }
 
 impl ParquetFileMetrics {
@@ -209,27 +229,41 @@ impl ExecutionPlan for ParquetExec {
     fn execute(
         &self,
         partition_index: usize,
-        context: Arc<TaskContext>,
+        ctx: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let projection = match self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
 
+        let parquet_file_reader_factory = self
+            .parquet_file_reader_factory
+            .as_ref()
+            .map(|f| Ok(Arc::clone(f)))
+            .unwrap_or_else(|| {
+                ctx.runtime_env()
+                    .object_store(&self.base_config.object_store_url)
+                    .map(|store| {
+                        Arc::new(DefaultParquetFileReaderFactory::new(store))
+                            as Arc<dyn ParquetFileReaderFactory>
+                    })
+            })?;
+
         let opener = ParquetOpener {
             partition_index,
             projection: Arc::from(projection),
-            batch_size: context.session_config().batch_size(),
+            batch_size: ctx.session_config().batch_size(),
             pruning_predicate: self.pruning_predicate.clone(),
             table_schema: self.base_config.file_schema.clone(),
             metadata_size_hint: self.metadata_size_hint,
             metrics: self.metrics.clone(),
+            parquet_file_reader_factory,
         };
 
         let stream = FileStream::new(
             &self.base_config,
             partition_index,
-            context,
+            ctx,
             opener,
             self.metrics.clone(),
         )?;
@@ -284,34 +318,37 @@ struct ParquetOpener {
     table_schema: SchemaRef,
     metadata_size_hint: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
+    parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
 }
 
 impl FileOpener for ParquetOpener {
     fn open(
         &self,
-        store: Arc<dyn ObjectStore>,
-        meta: ObjectMeta,
-        range: Option<FileRange>,
-    ) -> FileOpenFuture {
+        _: Arc<dyn ObjectStore>,
+        file_meta: FileMeta,
+    ) -> Result<FileOpenFuture> {
+        let file_range = file_meta.range.clone();
+
         let metrics = ParquetFileMetrics::new(
             self.partition_index,
-            meta.location.as_ref(),
+            file_meta.location().as_ref(),
             &self.metrics,
         );
 
-        let reader = ParquetFileReader {
-            store,
-            meta,
-            metadata_size_hint: self.metadata_size_hint,
-            metrics: metrics.clone(),
-        };
+        let reader =
+            BoxedAsyncFileReader(self.parquet_file_reader_factory.create_reader(
+                self.partition_index,
+                file_meta,
+                self.metadata_size_hint,
+                &self.metrics,
+            )?);
 
         let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
         let batch_size = self.batch_size;
         let projection = self.projection.clone();
         let pruning_predicate = self.pruning_predicate.clone();
 
-        Box::pin(async move {
+        Ok(Box::pin(async move {
             let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
             let adapted_projections =
                 schema_adapter.map_projections(builder.schema(), &projection)?;
@@ -322,7 +359,8 @@ impl FileOpener for ParquetOpener {
             );
 
             let groups = builder.metadata().row_groups();
-            let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics);
+            let row_groups =
+                prune_row_groups(groups, file_range, pruning_predicate, &metrics);
 
             let stream = builder
                 .with_projection(mask)
@@ -341,7 +379,32 @@ impl FileOpener for ParquetOpener {
                 });
 
             Ok(adapted.boxed())
-        })
+        }))
+    }
+}
+
+/// Factory of parquet file readers.
+///
+/// Provides means to implement custom data access interface.
+pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
+    /// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> Result<Box<dyn AsyncFileReader + Send>>;
+}
+
+#[derive(Debug)]
+pub struct DefaultParquetFileReaderFactory {
+    store: Arc<dyn ObjectStore>,
+}
+
+impl DefaultParquetFileReaderFactory {
+    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
+        Self { store }
     }
 }
 
@@ -349,8 +412,8 @@ impl FileOpener for ParquetOpener {
 struct ParquetFileReader {
     store: Arc<dyn ObjectStore>,
     meta: ObjectMeta,
-    metadata_size_hint: Option<usize>,
     metrics: ParquetFileMetrics,
+    metadata_size_hint: Option<usize>,
 }
 
 impl AsyncFileReader for ParquetFileReader {
@@ -389,6 +452,63 @@ impl AsyncFileReader for ParquetFileReader {
     }
 }
 
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> Result<Box<dyn AsyncFileReader + Send>> {
+        let parquet_file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            file_meta.location().as_ref(),
+            metrics,
+        );
+
+        Ok(Box::new(ParquetFileReader {
+            meta: file_meta.object_meta,
+            store: Arc::clone(&self.store),
+            metadata_size_hint,
+            metrics: parquet_file_metrics,
+        }))
+    }
+}
+
+///
+/// BoxedAsyncFileReader has been created to satisfy type requirements of
+/// parquet stream builder constructor.
+///
+/// Temporary pending https://github.com/apache/arrow-rs/pull/2368
+struct BoxedAsyncFileReader(Box<dyn AsyncFileReader + Send>);
+
+impl AsyncFileReader for BoxedAsyncFileReader {
+    fn get_bytes(
+        &mut self,
+        range: Range<usize>,
+    ) -> BoxFuture<'_, ::parquet::errors::Result<Bytes>> {
+        self.0.get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<usize>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+    // TODO: This where bound forces us to enable #![allow(where_clauses_object_safety)] (#3081)
+    // Upstream issue https://github.com/apache/arrow-rs/issues/2372
+    where
+        Self: Send,
+    {
+        self.0.get_byte_ranges(ranges)
+    }
+
+    fn get_metadata(
+        &mut self,
+    ) -> BoxFuture<'_, ::parquet::errors::Result<Arc<ParquetMetaData>>> {
+        self.0.get_metadata()
+    }
+}
+
 /// Wraps parquet statistics in a way
 /// that implements [`PruningStatistics`]
 struct RowGroupPruningStatistics<'a> {
@@ -652,12 +772,6 @@ pub async fn plan_to_parquet(
 
 #[cfg(test)]
 mod tests {
-    use crate::{
-        assert_batches_sorted_eq, assert_contains,
-        datasource::file_format::{parquet::ParquetFormat, FileFormat},
-        physical_plan::collect,
-    };
-
     use super::*;
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
@@ -666,6 +780,11 @@ mod tests {
     use crate::execution::options::CsvReadOptions;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
+    use crate::{
+        assert_batches_sorted_eq, assert_contains,
+        datasource::file_format::{parquet::ParquetFormat, FileFormat},
+        physical_plan::collect,
+    };
     use arrow::array::Float32Array;
     use arrow::record_batch::RecordBatch;
     use arrow::{
@@ -1086,6 +1205,7 @@ mod tests {
                 object_meta: meta.clone(),
                 partition_values: vec![],
                 range: Some(FileRange { start, end }),
+                extensions: None,
             }
         }
 
@@ -1188,6 +1308,7 @@ mod tests {
                 ScalarValue::Utf8(Some("26".to_owned())),
             ],
             range: None,
+            extensions: None,
         };
 
         let parquet_exec = ParquetExec::new(
@@ -1250,6 +1371,7 @@ mod tests {
             },
             partition_values: vec![],
             range: None,
+            extensions: None,
         };
 
         let parquet_exec = ParquetExec::new(
diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs
new file mode 100644
index 000000000..ac8c98381
--- /dev/null
+++ b/datafusion/core/tests/custom_parquet_reader.rs
@@ -0,0 +1,261 @@
+// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
+#![allow(where_clauses_object_safety)]
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray};
+    use arrow::datatypes::{Field, Schema};
+    use arrow::record_batch::RecordBatch;
+    use bytes::Bytes;
+    use datafusion::assert_batches_sorted_eq;
+    use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
+    use datafusion::datasource::listing::PartitionedFile;
+    use datafusion::datasource::object_store::ObjectStoreUrl;
+    use datafusion::physical_plan::file_format::{
+        FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics,
+        ParquetFileReaderFactory,
+    };
+    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+    use datafusion::physical_plan::{collect, Statistics};
+    use datafusion::prelude::SessionContext;
+    use datafusion_common::DataFusionError;
+    use futures::future::BoxFuture;
+    use futures::{FutureExt, TryFutureExt};
+    use object_store::memory::InMemory;
+    use object_store::path::Path;
+    use object_store::{ObjectMeta, ObjectStore};
+    use parquet::arrow::async_reader::AsyncFileReader;
+    use parquet::arrow::ArrowWriter;
+    use parquet::errors::ParquetError;
+    use parquet::file::metadata::ParquetMetaData;
+    use std::io::Cursor;
+    use std::ops::Range;
+    use std::sync::Arc;
+    use std::time::SystemTime;
+
+    const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";
+
+    #[tokio::test]
+    async fn route_data_access_ops_to_parquet_file_reader_factory() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        let batch = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        let file_schema = batch.schema().clone();
+        let (in_memory_object_store, parquet_files_meta) =
+            store_parquet_in_memory(vec![batch]).await;
+        let file_groups = parquet_files_meta
+            .into_iter()
+            .map(|meta| PartitionedFile {
+                object_meta: meta,
+                partition_values: vec![],
+                range: None,
+                extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
+            })
+            .collect();
+
+        // prepare the scan
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                // just any url that doesn't point to in memory object store
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: vec![file_groups],
+                file_schema,
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+            None,
+        )
+        .with_parquet_file_reader_factory(Arc::new(
+            InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)),
+        ));
+
+        let session_ctx = SessionContext::new();
+
+        let task_ctx = session_ctx.task_ctx();
+        let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[derive(Debug)]
+    struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
+
+    impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
+        fn create_reader(
+            &self,
+            partition_index: usize,
+            file_meta: FileMeta,
+            metadata_size_hint: Option<usize>,
+            metrics: &ExecutionPlanMetricsSet,
+        ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> {
+            let metadata = file_meta
+                .extensions
+                .as_ref()
+                .expect("has user defined metadata");
+            let metadata = metadata
+                .downcast_ref::<String>()
+                .expect("has string metadata");
+
+            assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);
+
+            let parquet_file_metrics = ParquetFileMetrics::new(
+                partition_index,
+                file_meta.location().as_ref(),
+                metrics,
+            );
+
+            Ok(Box::new(ParquetFileReader {
+                store: Arc::clone(&self.0),
+                meta: file_meta.object_meta,
+                metrics: parquet_file_metrics,
+                metadata_size_hint,
+            }))
+        }
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
+        )
+    }
+
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+    }
+
+    async fn store_parquet_in_memory(
+        batches: Vec<RecordBatch>,
+    ) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
+        let in_memory = InMemory::new();
+
+        let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
+            .into_iter()
+            .enumerate()
+            .map(|(offset, batch)| {
+                let mut buf = Vec::<u8>::with_capacity(32 * 1024);
+                let mut output = Cursor::new(&mut buf);
+
+                let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+
+                let meta = ObjectMeta {
+                    location: Path::parse(format!("file-{offset}.parquet"))
+                        .expect("creating path"),
+                    last_modified: chrono::DateTime::from(SystemTime::now()),
+                    size: buf.len(),
+                };
+
+                (meta, Bytes::from(buf))
+            })
+            .collect();
+
+        let mut objects = Vec::with_capacity(parquet_batches.len());
+        for (meta, bytes) in parquet_batches {
+            in_memory
+                .put(&meta.location, bytes)
+                .await
+                .expect("put parquet file into in memory object store");
+            objects.push(meta);
+        }
+
+        (Arc::new(in_memory), objects)
+    }
+
+    /// Implements [`AsyncFileReader`] for a parquet file in object storage
+    struct ParquetFileReader {
+        store: Arc<dyn ObjectStore>,
+        meta: ObjectMeta,
+        metrics: ParquetFileMetrics,
+        metadata_size_hint: Option<usize>,
+    }
+
+    impl AsyncFileReader for ParquetFileReader {
+        fn get_bytes(
+            &mut self,
+            range: Range<usize>,
+        ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+            self.metrics.bytes_scanned.add(range.end - range.start);
+
+            self.store
+                .get_range(&self.meta.location, range)
+                .map_err(|e| {
+                    ParquetError::General(format!(
+                        "AsyncChunkReader::get_bytes error: {}",
+                        e
+                    ))
+                })
+                .boxed()
+        }
+
+        fn get_metadata(
+            &mut self,
+        ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+            Box::pin(async move {
+                let metadata = fetch_parquet_metadata(
+                    self.store.as_ref(),
+                    &self.meta,
+                    self.metadata_size_hint,
+                )
+                .await
+                .map_err(|e| {
+                    ParquetError::General(format!(
+                        "AsyncChunkReader::get_metadata error: {}",
+                        e
+                    ))
+                })?;
+                Ok(Arc::new(metadata))
+            })
+        }
+    }
+}