You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/01 16:40:54 UTC

[arrow-datafusion] branch master updated: Expose remaining parquet config options into ConfigOptions (try 2) (#4427)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fb8eeb2ac Expose remaining parquet config options into ConfigOptions (try 2) (#4427)
fb8eeb2ac is described below

commit fb8eeb2ac80adeb872a32fc95cb29dbc17f0de60
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Dec 1 11:40:48 2022 -0500

    Expose remaining parquet config options into ConfigOptions (try 2) (#4427)
    
    * Expose remaining parquet config options into ConfigOptions (try 2)
    
    * fix: doctests
    
    * Update config doc
    
    * Update docs/source/user-guide/configs.md
    
    Co-authored-by: Dan Harris <13...@users.noreply.github.com>
    
    * Update configuration docs
    
    Co-authored-by: Dan Harris <13...@users.noreply.github.com>
---
 benchmarks/src/bin/tpch.rs                         |   3 +-
 datafusion-examples/examples/flight_server.rs      |   4 +-
 .../examples/parquet_sql_multiple_files.rs         |   3 +-
 datafusion/core/src/config.rs                      |  39 ++++++
 .../core/src/datasource/file_format/parquet.rs     | 133 ++++++++++++++-------
 datafusion/core/src/datasource/listing/table.rs    |  93 ++++++++------
 .../core/src/datasource/listing_table_factory.rs   |   2 +-
 datafusion/core/src/execution/context.rs           |  42 ++++---
 datafusion/core/src/execution/options.rs           |  36 +++---
 .../core/src/physical_plan/file_format/parquet.rs  |  74 +++++++-----
 datafusion/core/tests/parquet/page_pruning.rs      |   2 +-
 datafusion/core/tests/path_partition.rs            |   2 +-
 datafusion/core/tests/row.rs                       |  19 ++-
 datafusion/core/tests/sql/information_schema.rs    |   3 +
 datafusion/core/tests/sql/parquet.rs               |   6 +-
 datafusion/proto/src/logical_plan.rs               |   3 +-
 docs/source/user-guide/configs.md                  |   3 +
 17 files changed, 315 insertions(+), 152 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 57c9c5780..4bc37f0ae 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -396,7 +396,8 @@ async fn get_table(
             }
             "parquet" => {
                 let path = format!("{}/{}", path, table);
-                let format = ParquetFormat::default().with_enable_pruning(true);
+                let format = ParquetFormat::new(ctx.config_options())
+                    .with_enable_pruning(Some(true));
 
                 (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
             }
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index 88cc6f1c2..d10f11a43 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -67,7 +67,9 @@ impl FlightService for FlightServiceImpl {
     ) -> Result<Response<SchemaResult>, Status> {
         let request = request.into_inner();
 
-        let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
+        let config = SessionConfig::new();
+        let listing_options =
+            ListingOptions::new(Arc::new(ParquetFormat::new(config.config_options())));
         let table_path =
             ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;
 
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 95bcd4853..7e2038c8a 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -32,7 +32,8 @@ async fn main() -> Result<()> {
     let testdata = datafusion::test_util::parquet_test_data();
 
     // Configure listing options
-    let file_format = ParquetFormat::default().with_enable_pruning(true);
+    let file_format =
+        ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true));
     let listing_options = ListingOptions::new(Arc::new(file_format))
         .with_file_extension(FileType::PARQUET.get_ext());
 
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 620634f25..c738bc0cf 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -61,6 +61,16 @@ pub const OPT_PARQUET_REORDER_FILTERS: &str =
 pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
     "datafusion.execution.parquet.enable_page_index";
 
+/// Configuration option "datafusion.execution.parquet.pruning"
+pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning";
+
+/// Configuration option "datafusion.execution.parquet.skip_metadata"
+pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata";
+
+/// Configuration option "datafusion.execution.parquet.metadata_size_hint"
+pub const OPT_PARQUET_METADATA_SIZE_HINT: &str =
+    "datafusion.execution.parquet.metadata_size_hint";
+
 /// Configuration option "datafusion.optimizer.skip_failed_rules"
 pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
     "datafusion.optimizer.skip_failed_rules";
@@ -255,6 +265,29 @@ impl BuiltInConfigs {
                  to reduce the number of rows decoded.",
                 false,
             ),
+            ConfigDefinition::new_bool(
+                OPT_PARQUET_ENABLE_PRUNING,
+                "If true, the parquet reader attempts to skip entire row groups based \
+                 on the predicate in the query and the metadata (min/max values) stored in \
+                 the parquet file.",
+                true,
+            ),
+            ConfigDefinition::new_bool(
+                OPT_PARQUET_SKIP_METADATA,
+                "If true, the parquet reader skip the optional embedded metadata that may be in \
+                the file Schema. This setting can help avoid schema conflicts when querying \
+                multiple parquet files with schemas containing compatible types but different metadata.",
+                true,
+            ),
+            ConfigDefinition::new(
+                OPT_PARQUET_METADATA_SIZE_HINT,
+                "If specified, the parquet reader will try and fetch the last `size_hint` \
+                 bytes of the parquet file optimistically. If not specified, two read are required: \
+                 One read to fetch the 8-byte parquet footer and  \
+                 another to fetch the metadata length encoded in the footer.",
+                DataType::UInt64,
+                ScalarValue::UInt64(None),
+            ),
             ConfigDefinition::new_bool(
                 OPT_OPTIMIZER_SKIP_FAILED_RULES,
                 "When set to true, the logical plan optimizer will produce warning \
@@ -424,6 +457,12 @@ impl ConfigOptions {
         get_conf_value!(self, UInt64, key, "u64")
     }
 
+    /// get a u64 configuration option as a usize
+    pub fn get_usize(&self, key: &str) -> Option<usize> {
+        let v = get_conf_value!(self, UInt64, key, "usize");
+        v.and_then(|v| v.try_into().ok())
+    }
+
     /// get a string configuration option
     pub fn get_string(&self, key: &str) -> Option<String> {
         get_conf_value!(self, Utf8, key, "string")
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index fa9ab13cd..6e1fa1782 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -28,6 +28,7 @@ use datafusion_common::DataFusionError;
 use datafusion_optimizer::utils::conjunction;
 use hashbrown::HashMap;
 use object_store::{ObjectMeta, ObjectStore};
+use parking_lot::RwLock;
 use parquet::arrow::parquet_to_arrow_schema;
 use parquet::file::footer::{decode_footer, decode_metadata};
 use parquet::file::metadata::ParquetMetaData;
@@ -39,6 +40,10 @@ use crate::arrow::array::{
     BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
 };
 use crate::arrow::datatypes::{DataType, Field};
+use crate::config::ConfigOptions;
+use crate::config::OPT_PARQUET_ENABLE_PRUNING;
+use crate::config::OPT_PARQUET_METADATA_SIZE_HINT;
+use crate::config::OPT_PARQUET_SKIP_METADATA;
 use crate::datasource::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
 use crate::logical_expr::Expr;
@@ -52,51 +57,69 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
 /// The Apache Parquet `FileFormat` implementation
 #[derive(Debug)]
 pub struct ParquetFormat {
-    enable_pruning: bool,
+    // Session level configuration
+    config_options: Arc<RwLock<ConfigOptions>>,
+    // local overides
+    enable_pruning: Option<bool>,
     metadata_size_hint: Option<usize>,
-    skip_metadata: bool,
+    skip_metadata: Option<bool>,
 }
 
-impl Default for ParquetFormat {
-    fn default() -> Self {
+impl ParquetFormat {
+    /// construct a new Format with the specified `ConfigOptions`
+    pub fn new(config_options: Arc<RwLock<ConfigOptions>>) -> Self {
         Self {
-            enable_pruning: true,
+            config_options,
+            enable_pruning: None,
             metadata_size_hint: None,
-            skip_metadata: true,
+            skip_metadata: None,
         }
     }
-}
 
-impl ParquetFormat {
     /// Activate statistics based row group level pruning
-    /// - defaults to true
-    pub fn with_enable_pruning(mut self, enable: bool) -> Self {
+    /// - If None, defaults to value on `config_options`
+    pub fn with_enable_pruning(mut self, enable: Option<bool>) -> Self {
         self.enable_pruning = enable;
         self
     }
 
+    /// Return true if pruning is enabled
+    pub fn enable_pruning(&self) -> bool {
+        self.enable_pruning
+            .or_else(|| {
+                self.config_options
+                    .read()
+                    .get_bool(OPT_PARQUET_ENABLE_PRUNING)
+            })
+            .unwrap_or(true)
+    }
+
     /// Provide a hint to the size of the file metadata. If a hint is provided
     /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
     /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then
     /// another read to fetch the metadata length encoded in the footer.
-    pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self {
-        self.metadata_size_hint = Some(size_hint);
+    ///
+    /// - If None, defaults to value on `config_options`
+    pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
+        self.metadata_size_hint = size_hint;
         self
     }
-    /// Return true if pruning is enabled
-    pub fn enable_pruning(&self) -> bool {
-        self.enable_pruning
-    }
 
     /// Return the metadata size hint if set
     pub fn metadata_size_hint(&self) -> Option<usize> {
-        self.metadata_size_hint
+        self.metadata_size_hint.or_else(|| {
+            self.config_options
+                .read()
+                .get_usize(OPT_PARQUET_METADATA_SIZE_HINT)
+        })
     }
 
     /// Tell the parquet reader to skip any metadata that may be in
     /// the file Schema. This can help avoid schema conflicts due to
-    /// metadata.  Defaults to true.
-    pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
+    /// metadata.
+    ///
+    /// - If None, defaults to value on `config_options`
+    pub fn with_skip_metadata(mut self, skip_metadata: Option<bool>) -> Self {
         self.skip_metadata = skip_metadata;
         self
     }
@@ -105,6 +128,12 @@ impl ParquetFormat {
     /// schema merging.
     pub fn skip_metadata(&self) -> bool {
         self.skip_metadata
+            .or_else(|| {
+                self.config_options
+                    .read()
+                    .get_bool(OPT_PARQUET_SKIP_METADATA)
+            })
+            .unwrap_or(true)
     }
 }
 
@@ -143,7 +172,7 @@ impl FileFormat for ParquetFormat {
             schemas.push(schema)
         }
 
-        let schema = if self.skip_metadata {
+        let schema = if self.skip_metadata() {
             Schema::try_merge(clear_metadata(schemas))
         } else {
             Schema::try_merge(schemas)
@@ -176,7 +205,7 @@ impl FileFormat for ParquetFormat {
         // If enable pruning then combine the filters to build the predicate.
         // If disable pruning then set the predicate to None, thus readers
         // will not prune data based on the statistics.
-        let predicate = if self.enable_pruning {
+        let predicate = if self.enable_pruning() {
             conjunction(filters.to_vec())
         } else {
             None
@@ -620,7 +649,8 @@ mod tests {
         let store = Arc::new(LocalFileSystem::new()) as _;
         let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
 
-        let format = ParquetFormat::default();
+        let ctx = SessionContext::new();
+        let format = ParquetFormat::new(ctx.config_options());
         let schema = format.infer_schema(&store, &meta).await.unwrap();
 
         let stats =
@@ -767,7 +797,9 @@ mod tests {
 
         assert_eq!(store.request_count(), 2);
 
-        let format = ParquetFormat::default().with_metadata_size_hint(9);
+        let ctx = SessionContext::new();
+        let format =
+            ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(Some(9));
         let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
 
         let stats =
@@ -794,7 +826,9 @@ mod tests {
         // ensure the requests were coalesced into a single request
         assert_eq!(store.request_count(), 1);
 
-        let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
+        let ctx = SessionContext::new();
+        let format = ParquetFormat::new(ctx.config_options())
+            .with_metadata_size_hint(Some(size_hint));
         let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
         let stats = fetch_statistics(
             store.upcast().as_ref(),
@@ -831,7 +865,7 @@ mod tests {
         let config = SessionConfig::new().with_batch_size(2);
         let ctx = SessionContext::with_config(config);
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
         let task_ctx = ctx.task_ctx();
         let stream = exec.execute(0, task_ctx)?;
 
@@ -860,11 +894,12 @@ mod tests {
 
         // Read the full file
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
 
         // Read only one column. This should scan less data.
         let projection = Some(vec![0]);
-        let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec_projected =
+            get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let task_ctx = ctx.task_ctx();
 
@@ -882,7 +917,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, Some(1)).await?;
 
         // note: even if the limit is set, the executor rounds up to the batch size
         assert_eq!(exec.statistics().num_rows, Some(8));
@@ -901,7 +937,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -939,7 +976,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![1]);
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -965,7 +1003,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![0]);
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -988,7 +1027,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![10]);
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -1011,7 +1051,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![6]);
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -1037,7 +1078,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![7]);
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -1063,7 +1105,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![9]);
-        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
+        let exec =
+            get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -1090,7 +1133,7 @@ mod tests {
         let task_ctx = session_ctx.task_ctx();
 
         // parquet use the int32 as the physical type to store decimal
-        let exec = get_exec("int32_decimal.parquet", None, None).await?;
+        let exec = get_exec(&session_ctx, "int32_decimal.parquet", None, None).await?;
         let batches = collect(exec, task_ctx.clone()).await?;
         assert_eq!(1, batches.len());
         assert_eq!(1, batches[0].num_columns());
@@ -1098,7 +1141,7 @@ mod tests {
         assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
 
         // parquet use the int64 as the physical type to store decimal
-        let exec = get_exec("int64_decimal.parquet", None, None).await?;
+        let exec = get_exec(&session_ctx, "int64_decimal.parquet", None, None).await?;
         let batches = collect(exec, task_ctx.clone()).await?;
         assert_eq!(1, batches.len());
         assert_eq!(1, batches[0].num_columns());
@@ -1106,14 +1149,21 @@ mod tests {
         assert_eq!(&DataType::Decimal128(10, 2), column.data_type());
 
         // parquet use the fixed length binary as the physical type to store decimal
-        let exec = get_exec("fixed_length_decimal.parquet", None, None).await?;
+        let exec =
+            get_exec(&session_ctx, "fixed_length_decimal.parquet", None, None).await?;
         let batches = collect(exec, task_ctx.clone()).await?;
         assert_eq!(1, batches.len());
         assert_eq!(1, batches[0].num_columns());
         let column = batches[0].column(0);
         assert_eq!(&DataType::Decimal128(25, 2), column.data_type());
 
-        let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?;
+        let exec = get_exec(
+            &session_ctx,
+            "fixed_length_decimal_legacy.parquet",
+            None,
+            None,
+        )
+        .await?;
         let batches = collect(exec, task_ctx.clone()).await?;
         assert_eq!(1, batches.len());
         assert_eq!(1, batches[0].num_columns());
@@ -1123,7 +1173,7 @@ mod tests {
         // parquet use the fixed length binary as the physical type to store decimal
         // TODO: arrow-rs don't support convert the physical type of binary to decimal
         // https://github.com/apache/arrow-rs/pull/2160
-        // let exec = get_exec("byte_array_decimal.parquet", None, None).await?;
+        // let exec = get_exec(&session_ctx, "byte_array_decimal.parquet", None, None).await?;
 
         Ok(())
     }
@@ -1207,12 +1257,13 @@ mod tests {
     }
 
     async fn get_exec(
+        ctx: &SessionContext,
         file_name: &str,
         projection: Option<Vec<usize>>,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let testdata = crate::test_util::parquet_test_data();
-        let format = ParquetFormat::default();
+        let format = ParquetFormat::new(ctx.config_options());
         scan_format(&format, &testdata, file_name, projection, limit).await
     }
 }
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 374699065..4ca044f0b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -28,7 +28,9 @@ use datafusion_physical_expr::PhysicalSortExpr;
 use futures::{future, stream, StreamExt, TryStreamExt};
 use object_store::path::Path;
 use object_store::ObjectMeta;
+use parking_lot::RwLock;
 
+use crate::config::ConfigOptions;
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::{
     file_format::{
@@ -107,7 +109,10 @@ impl ListingTableConfig {
         }
     }
 
-    fn infer_format(path: &str) -> Result<(Arc<dyn FileFormat>, String)> {
+    fn infer_format(
+        config_options: Arc<RwLock<ConfigOptions>>,
+        path: &str,
+    ) -> Result<(Arc<dyn FileFormat>, String)> {
         let err_msg = format!("Unable to infer file type from path: {}", path);
 
         let mut exts = path.rsplit('.');
@@ -136,15 +141,15 @@ impl ListingTableConfig {
             FileType::JSON => Arc::new(
                 JsonFormat::default().with_file_compression_type(file_compression_type),
             ),
-            FileType::PARQUET => Arc::new(ParquetFormat::default()),
+            FileType::PARQUET => Arc::new(ParquetFormat::new(config_options)),
         };
 
         Ok((file_format, ext))
     }
 
     /// Infer `ListingOptions` based on `table_path` suffix.
-    pub async fn infer_options(self, ctx: &SessionState) -> Result<Self> {
-        let store = ctx
+    pub async fn infer_options(self, state: &SessionState) -> Result<Self> {
+        let store = state
             .runtime_env
             .object_store(self.table_paths.get(0).unwrap())?;
 
@@ -157,12 +162,14 @@ impl ListingTableConfig {
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
 
-        let (format, file_extension) =
-            ListingTableConfig::infer_format(file.location.as_ref())?;
+        let (format, file_extension) = ListingTableConfig::infer_format(
+            state.config_options(),
+            file.location.as_ref(),
+        )?;
 
         let listing_options = ListingOptions::new(format)
             .with_file_extension(file_extension)
-            .with_target_partitions(ctx.config.target_partitions);
+            .with_target_partitions(state.config.target_partitions);
 
         Ok(Self {
             table_paths: self.table_paths,
@@ -251,11 +258,15 @@ impl ListingOptions {
     /// Set file extension on [`ListingOptions`] and returns self.
     ///
     /// ```
-    /// use std::sync::Arc;
-    /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
+    /// # use std::sync::Arc;
+    /// # use datafusion::prelude::SessionContext;
+    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
-    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_file_extension(".parquet");
+    /// let ctx = SessionContext::new();
+    /// let listing_options = ListingOptions::new(Arc::new(
+    ///     ParquetFormat::new(ctx.config_options())
+    ///   ))
+    ///   .with_file_extension(".parquet");
     ///
     /// assert_eq!(listing_options.file_extension, ".parquet");
     /// ```
@@ -267,13 +278,17 @@ impl ListingOptions {
     /// Set table partition column names on [`ListingOptions`] and returns self.
     ///
     /// ```
-    /// use std::sync::Arc;
-    /// use arrow::datatypes::DataType;
-    /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
+    /// # use std::sync::Arc;
+    /// # use arrow::datatypes::DataType;
+    /// # use datafusion::prelude::{col, SessionContext};
+    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
-    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
-    ///         ("col_b".to_string(), DataType::Utf8)]);
+    /// let ctx = SessionContext::new();
+    /// let listing_options = ListingOptions::new(Arc::new(
+    ///     ParquetFormat::new(ctx.config_options())
+    ///   ))
+    ///   .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
+    ///       ("col_b".to_string(), DataType::Utf8)]);
     ///
     /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
     ///     ("col_b".to_string(), DataType::Utf8)]);
@@ -289,12 +304,15 @@ impl ListingOptions {
     /// Set stat collection on [`ListingOptions`] and returns self.
     ///
     /// ```
-    /// use std::sync::Arc;
-    /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
+    /// # use std::sync::Arc;
+    /// # use datafusion::prelude::SessionContext;
+    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
-    /// // Enable stat collection
-    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_collect_stat(true);
+    /// let ctx = SessionContext::new();
+    /// let listing_options = ListingOptions::new(Arc::new(
+    ///     ParquetFormat::new(ctx.config_options())
+    ///   ))
+    ///   .with_collect_stat(true);
     ///
     /// assert_eq!(listing_options.collect_stat, true);
     /// ```
@@ -306,11 +324,15 @@ impl ListingOptions {
     /// Set number of target partitions on [`ListingOptions`] and returns self.
     ///
     /// ```
-    /// use std::sync::Arc;
-    /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
+    /// # use std::sync::Arc;
+    /// # use datafusion::prelude::SessionContext;
+    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
-    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_target_partitions(8);
+    /// let ctx = SessionContext::new();
+    /// let listing_options = ListingOptions::new(Arc::new(
+    ///     ParquetFormat::new(ctx.config_options())
+    ///   ))
+    ///   .with_target_partitions(8);
     ///
     /// assert_eq!(listing_options.target_partitions, 8);
     /// ```
@@ -322,18 +344,20 @@ impl ListingOptions {
     /// Set file sort order on [`ListingOptions`] and returns self.
     ///
     /// ```
-    /// use std::sync::Arc;
-    /// use datafusion::prelude::{Expr, col};
-    /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
-    ///
+    /// # use std::sync::Arc;
+    /// # use datafusion::prelude::{col, SessionContext};
+    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
     ///  // Tell datafusion that the files are sorted by column "a"
     ///  let file_sort_order = Some(vec![
     ///    col("a").sort(true, true)
     ///  ]);
     ///
-    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_file_sort_order(file_sort_order.clone());
+    /// let ctx = SessionContext::new();
+    /// let listing_options = ListingOptions::new(Arc::new(
+    ///     ParquetFormat::new(ctx.config_options())
+    ///   ))
+    ///   .with_file_sort_order(file_sort_order.clone());
     ///
     /// assert_eq!(listing_options.file_sort_order, file_sort_order);
     /// ```
@@ -709,7 +733,7 @@ mod tests {
         let ctx = SessionContext::new();
         let state = ctx.state();
 
-        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+        let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options())));
         let schema = opt.infer_schema(&state, &table_path).await?;
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(opt)
@@ -731,7 +755,8 @@ mod tests {
 
         let ctx = SessionContext::new();
         let state = ctx.state();
-        let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
+        let options =
+            ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options())));
         let schema = options.infer_schema(&state, &table_path).await.unwrap();
 
         use physical_plan::expressions::col as physical_col;
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 9a38e1768..537288646 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -82,7 +82,7 @@ impl TableProviderFactory for ListingTableFactory {
                     .with_delimiter(cmd.delimiter as u8)
                     .with_file_compression_type(file_compression_type),
             ),
-            FileType::PARQUET => Arc::new(ParquetFormat::default()),
+            FileType::PARQUET => Arc::new(ParquetFormat::new(state.config_options())),
             FileType::AVRO => Arc::new(AvroFormat::default()),
             FileType::JSON => Arc::new(
                 JsonFormat::default().with_file_compression_type(file_compression_type),
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 2dd270459..6b6a4fb1f 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -21,6 +21,7 @@ use crate::{
         catalog::{CatalogList, MemoryCatalogList},
         information_schema::CatalogWithInformationSchema,
     },
+    config::OPT_PARQUET_ENABLE_PRUNING,
     datasource::listing::{ListingOptions, ListingTable},
     datasource::{MemTable, ViewTable},
     logical_expr::{PlanType, ToStringifiedPlan},
@@ -228,6 +229,11 @@ impl SessionContext {
         self.state.read().runtime_env.clone()
     }
 
+    /// Return a handle to the shared configuration options
+    pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
+        self.state.read().config_options()
+    }
+
     /// Return the session_id of this Session
     pub fn session_id(&self) -> String {
         self.session_id.clone()
@@ -693,9 +699,7 @@ impl SessionContext {
         options: ParquetReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let target_partitions = self.copied_config().target_partitions;
-
-        let listing_options = options.to_listing_options(target_partitions);
+        let listing_options = options.to_listing_options(&self.state.read().config);
 
         // with parquet we resolve the schema in all cases
         let resolved_schema = listing_options
@@ -814,13 +818,7 @@ impl SessionContext {
         table_path: &str,
         options: ParquetReadOptions<'_>,
     ) -> Result<()> {
-        let (target_partitions, parquet_pruning) = {
-            let conf = self.copied_config();
-            (conf.target_partitions, conf.parquet_pruning)
-        };
-        let listing_options = options
-            .parquet_pruning(parquet_pruning)
-            .to_listing_options(target_partitions);
+        let listing_options = options.to_listing_options(&self.state.read().config);
 
         self.register_listing_table(name, table_path, listing_options, None, None)
             .await?;
@@ -1162,8 +1160,6 @@ pub struct SessionConfig {
     /// Should DataFusion repartition data using the partition keys to execute window functions in
     /// parallel using the provided `target_partitions` level
     pub repartition_windows: bool,
-    /// Should DataFusion parquet reader using the predicate to prune data
-    pub parquet_pruning: bool,
     /// Should DataFusion collect statistics after listing files
     pub collect_statistics: bool,
     /// Configuration options
@@ -1183,7 +1179,6 @@ impl Default for SessionConfig {
             repartition_joins: true,
             repartition_aggregations: true,
             repartition_windows: true,
-            parquet_pruning: true,
             collect_statistics: false,
             config_options: Arc::new(RwLock::new(ConfigOptions::new())),
             // Assume no extensions by default.
@@ -1287,11 +1282,21 @@ impl SessionConfig {
     }
 
     /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
-    pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
-        self.parquet_pruning = enabled;
+    pub fn with_parquet_pruning(self, enabled: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_PARQUET_ENABLE_PRUNING, enabled);
         self
     }
 
+    /// Returns true if pruning predicate should be used to skip parquet row groups
+    pub fn parquet_pruning(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_PARQUET_ENABLE_PRUNING)
+            .unwrap_or(false)
+    }
+
     /// Enables or disables the collection of statistics after listing files
     pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
         self.collect_statistics = enabled;
@@ -1339,7 +1344,7 @@ impl SessionConfig {
         );
         map.insert(
             PARQUET_PRUNING.to_owned(),
-            format!("{}", self.parquet_pruning),
+            format!("{}", self.parquet_pruning()),
         );
         map.insert(
             COLLECT_STATISTICS.to_owned(),
@@ -1733,6 +1738,11 @@ impl SessionState {
         let logical_plan = self.optimize(logical_plan)?;
         planner.create_physical_plan(&logical_plan, self).await
     }
+
+    /// return the configuration options
+    pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
+        self.config.config_options()
+    }
 }
 
 impl ContextProvider for SessionState {
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index 17f59ec7f..430d10862 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -34,6 +34,8 @@ use crate::datasource::{
     listing::ListingOptions,
 };
 
+use super::context::SessionConfig;
+
 /// Options that control the reading of CSV files.
 ///
 /// Note this structure is supplied when a datasource is created and
@@ -167,26 +169,24 @@ pub struct ParquetReadOptions<'a> {
     pub file_extension: &'a str,
     /// Partition Columns
     pub table_partition_cols: Vec<(String, DataType)>,
-    /// Should DataFusion parquet reader use the predicate to prune data,
-    /// overridden by value on execution::context::SessionConfig
-    // TODO move this into ConfigOptions
-    pub parquet_pruning: bool,
-    /// Tell the parquet reader to skip any metadata that may be in
-    /// the file Schema. This can help avoid schema conflicts due to
-    /// metadata.  Defaults to true.
-    // TODO move this into ConfigOptions
-    pub skip_metadata: bool,
+    /// Should the parquet reader use the predicate to prune row groups?
+    /// If None, uses value in SessionConfig
+    pub parquet_pruning: Option<bool>,
+    /// Should the parquet reader to skip any metadata that may be in
+    /// the file Schema? This can help avoid schema conflicts due to
+    /// metadata.
+    ///
+    /// If None specified, uses value in SessionConfig
+    pub skip_metadata: Option<bool>,
 }
 
 impl<'a> Default for ParquetReadOptions<'a> {
     fn default() -> Self {
-        let format_default = ParquetFormat::default();
-
         Self {
             file_extension: DEFAULT_PARQUET_EXTENSION,
             table_partition_cols: vec![],
-            parquet_pruning: format_default.enable_pruning(),
-            skip_metadata: format_default.skip_metadata(),
+            parquet_pruning: None,
+            skip_metadata: None,
         }
     }
 }
@@ -194,7 +194,7 @@ impl<'a> Default for ParquetReadOptions<'a> {
 impl<'a> ParquetReadOptions<'a> {
     /// Specify parquet_pruning
     pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
-        self.parquet_pruning = parquet_pruning;
+        self.parquet_pruning = Some(parquet_pruning);
         self
     }
 
@@ -202,7 +202,7 @@ impl<'a> ParquetReadOptions<'a> {
     /// the file Schema. This can help avoid schema conflicts due to
     /// metadata.  Defaults to true.
     pub fn skip_metadata(mut self, skip_metadata: bool) -> Self {
-        self.skip_metadata = skip_metadata;
+        self.skip_metadata = Some(skip_metadata);
         self
     }
 
@@ -216,14 +216,14 @@ impl<'a> ParquetReadOptions<'a> {
     }
 
     /// Helper to convert these user facing options to `ListingTable` options
-    pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
-        let file_format = ParquetFormat::default()
+    pub fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {
+        let file_format = ParquetFormat::new(config.config_options())
             .with_enable_pruning(self.parquet_pruning)
             .with_skip_metadata(self.skip_metadata);
 
         ListingOptions::new(Arc::new(file_format))
             .with_file_extension(self.file_extension)
-            .with_target_partitions(target_partitions)
+            .with_target_partitions(config.target_partitions)
             .with_table_partition_cols(self.table_partition_cols.clone())
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 596d2b09e..edf72b523 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -72,6 +72,16 @@ use super::get_output_ordering;
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
+    /// Override for `Self::with_pushdown_filters`. If None, uses
+    /// values from base_config
+    pushdown_filters: Option<bool>,
+    /// Override for `Self::with_reorder_filters`. If None, uses
+    /// values from base_config
+    reorder_filters: Option<bool>,
+    /// Override for `Self::with_enable_page_index`. If None, uses
+    /// values from base_config
+    enable_page_index: Option<bool>,
+    /// Base configuraton for this scan
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
@@ -131,6 +141,9 @@ impl ParquetExec {
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
+            pushdown_filters: None,
+            reorder_filters: None,
+            enable_page_index: None,
             base_config,
             projected_schema,
             projected_statistics,
@@ -172,20 +185,20 @@ impl ParquetExec {
     /// `ParquetRecordBatchStream`. These filters are applied by the
     /// parquet decoder to skip unecessairly decoding other columns
     /// which would not pass the predicate. Defaults to false
-    pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self {
-        self.base_config
-            .config_options
-            .write()
-            .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+    pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
+        self.pushdown_filters = Some(pushdown_filters);
         self
     }
 
     /// Return the value described in [`Self::with_pushdown_filters`]
     pub fn pushdown_filters(&self) -> bool {
-        self.base_config
-            .config_options
-            .read()
-            .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS)
+        self.pushdown_filters
+            .or_else(|| {
+                self.base_config
+                    .config_options
+                    .read()
+                    .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS)
+            })
             // default to false
             .unwrap_or_default()
     }
@@ -194,20 +207,20 @@ impl ParquetExec {
     /// minimize the cost of filter evaluation by reordering the
     /// predicate [`Expr`]s. If false, the predicates are applied in
     /// the same order as specified in the query. Defaults to false.
-    pub fn with_reorder_filters(self, reorder_filters: bool) -> Self {
-        self.base_config
-            .config_options
-            .write()
-            .set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+    pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
+        self.reorder_filters = Some(reorder_filters);
         self
     }
 
     /// Return the value described in [`Self::with_reorder_filters`]
     pub fn reorder_filters(&self) -> bool {
-        self.base_config
-            .config_options
-            .read()
-            .get_bool(OPT_PARQUET_REORDER_FILTERS)
+        self.reorder_filters
+            .or_else(|| {
+                self.base_config
+                    .config_options
+                    .read()
+                    .get_bool(OPT_PARQUET_REORDER_FILTERS)
+            })
             // default to false
             .unwrap_or_default()
     }
@@ -216,20 +229,20 @@ impl ParquetExec {
     /// This is used to optimise filter pushdown
     /// via `RowSelector` and `RowFilter` by
     /// eliminating unnecessary IO and decoding
-    pub fn with_enable_page_index(self, enable_page_index: bool) -> Self {
-        self.base_config
-            .config_options
-            .write()
-            .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
+    pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
+        self.enable_page_index = Some(enable_page_index);
         self
     }
 
     /// Return the value described in [`Self::with_enable_page_index`]
     pub fn enable_page_index(&self) -> bool {
-        self.base_config
-            .config_options
-            .read()
-            .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX)
+        self.enable_page_index
+            .or_else(|| {
+                self.base_config
+                    .config_options
+                    .read()
+                    .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX)
+            })
             // default to false
             .unwrap_or_default()
     }
@@ -1286,7 +1299,8 @@ mod tests {
     async fn parquet_exec_with_projection() -> Result<()> {
         let testdata = crate::test_util::parquet_test_data();
         let filename = "alltypes_plain.parquet";
-        let format = ParquetFormat::default();
+        let ctx = SessionContext::new();
+        let format = ParquetFormat::new(ctx.config_options());
         let parquet_exec =
             scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None)
                 .await
@@ -1369,7 +1383,7 @@ mod tests {
         let meta = local_unpartitioned_file(filename);
 
         let store = Arc::new(LocalFileSystem::new()) as _;
-        let file_schema = ParquetFormat::default()
+        let file_schema = ParquetFormat::new(session_ctx.config_options())
             .infer_schema(&store, &[meta.clone()])
             .await?;
 
@@ -1416,7 +1430,7 @@ mod tests {
 
         let meta = local_unpartitioned_file(filename);
 
-        let schema = ParquetFormat::default()
+        let schema = ParquetFormat::new(session_ctx.config_options())
             .infer_schema(&store, &[meta.clone()])
             .await
             .unwrap();
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 788082e37..51c55912d 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -49,7 +49,7 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE
         size: metadata.len() as usize,
     };
 
-    let schema = ParquetFormat::default()
+    let schema = ParquetFormat::new(session_ctx.config_options())
         .infer_schema(&store, &[meta.clone()])
         .await
         .unwrap();
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 41ae75864..40009bad6 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -562,7 +562,7 @@ async fn register_partitioned_alltypes_parquet(
         MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
     );
 
-    let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
+    let options = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options())))
         .with_table_partition_cols(
             partition_cols
                 .iter()
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 1849fc9f8..92c47989e 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -34,7 +34,13 @@ async fn test_with_parquet() -> Result<()> {
     let session_ctx = SessionContext::new();
     let task_ctx = session_ctx.task_ctx();
     let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
-    let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?;
+    let exec = get_exec(
+        &session_ctx,
+        "alltypes_plain.parquet",
+        projection.as_ref(),
+        None,
+    )
+    .await?;
     let schema = exec.schema().clone();
 
     let batches = collect(exec, task_ctx).await?;
@@ -55,7 +61,13 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
     let session_ctx = SessionContext::new();
     let task_ctx = session_ctx.task_ctx();
     let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
-    let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?;
+    let exec = get_exec(
+        &session_ctx,
+        "alltypes_plain.parquet",
+        projection.as_ref(),
+        None,
+    )
+    .await?;
     let schema = exec.schema().clone();
 
     let batches = collect(exec, task_ctx).await?;
@@ -72,6 +84,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
 }
 
 async fn get_exec(
+    ctx: &SessionContext,
     file_name: &str,
     projection: Option<&Vec<usize>>,
     limit: Option<usize>,
@@ -81,7 +94,7 @@ async fn get_exec(
 
     let path = Path::from_filesystem_path(filename).unwrap();
 
-    let format = ParquetFormat::default();
+    let format = ParquetFormat::new(ctx.config_options());
     let object_store = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
     let object_store_url = ObjectStoreUrl::local_filesystem();
 
diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs
index fa2e20569..75b252ac1 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -707,8 +707,11 @@ async fn show_all() {
         "| datafusion.execution.coalesce_batches                     | true    |",
         "| datafusion.execution.coalesce_target_batch_size           | 4096    |",
         "| datafusion.execution.parquet.enable_page_index            | false   |",
+        "| datafusion.execution.parquet.metadata_size_hint           | NULL    |",
+        "| datafusion.execution.parquet.pruning                      | true    |",
         "| datafusion.execution.parquet.pushdown_filters             | false   |",
         "| datafusion.execution.parquet.reorder_filters              | false   |",
+        "| datafusion.execution.parquet.skip_metadata                | true    |",
         "| datafusion.execution.time_zone                            | +00:00  |",
         "| datafusion.explain.logical_plan_only                      | false   |",
         "| datafusion.explain.physical_plan_only                     | false   |",
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 18ebf57c0..e2a33c7c2 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -55,11 +55,11 @@ async fn parquet_query() {
 /// expressions make it all the way down to the ParquetExec
 async fn parquet_with_sort_order_specified() {
     let parquet_read_options = ParquetReadOptions::default();
-    let target_partitions = 2;
+    let session_config = SessionConfig::new().with_target_partitions(2);
 
     // The sort order is not specified
     let options_no_sort = parquet_read_options
-        .to_listing_options(target_partitions)
+        .to_listing_options(&session_config)
         .with_file_sort_order(None);
 
     // The sort order is specified (not actually correct in this case)
@@ -73,7 +73,7 @@ async fn parquet_with_sort_order_specified() {
         .collect::<Vec<_>>();
 
     let options_sort = parquet_read_options
-        .to_listing_options(target_partitions)
+        .to_listing_options(&session_config)
         .with_file_sort_order(Some(file_sort_order));
 
     // This string appears in ParquetExec if the output ordering is
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 346143348..78db01313 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -423,7 +423,8 @@ impl AsLogicalPlan for LogicalPlanNode {
                         &FileFormatType::Parquet(protobuf::ParquetFormat {
                             enable_pruning,
                         }) => Arc::new(
-                            ParquetFormat::default().with_enable_pruning(enable_pruning),
+                            ParquetFormat::new(ctx.config_options())
+                                .with_enable_pruning(Some(enable_pruning)),
                         ),
                         FileFormatType::Csv(protobuf::CsvFormat {
                             has_header,
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index a42050c75..8bb316433 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -43,8 +43,11 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 | datafusion.execution.coalesce_batches                     | Boolean | true    | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. |
 | datafusion.execution.coalesce_target_batch_size           | UInt64  | 4096    | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'.                                                                                                                                                                                                                        |
 | datafusion.execution.parquet.enable_page_index            | Boolean | false   | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                                  |
+| datafusion.execution.parquet.metadata_size_hint           | UInt64  | NULL    | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer.                                                                                       |
+| datafusion.execution.parquet.pruning                      | Boolean | true    | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file.                                                                                                                                                                                              |
 | datafusion.execution.parquet.pushdown_filters             | Boolean | false   | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded.                                                                                                                                                                                                                                        |
 | datafusion.execution.parquet.reorder_filters              | Boolean | false   | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query.                                                                                                                                 |
+| datafusion.execution.parquet.skip_metadata                | Boolean | true    | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata.                                                                                                            |
 | datafusion.execution.time_zone                            | Utf8    | +00:00  | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,                                                                                                                                                                                                                  |
 | then extract the hour.                                    |
 | datafusion.explain.logical_plan_only                      | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                                        |