You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/03 08:39:18 UTC

[arrow-datafusion] branch master updated: Structify ConfigOptions (#4517) (#4771)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 21169b9c1 Structify ConfigOptions (#4517) (#4771)
21169b9c1 is described below

commit 21169b9c10ddcdd80d71fc4b19a284962b36049f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Jan 3 08:39:13 2023 +0000

    Structify ConfigOptions (#4517) (#4771)
    
    * Structify ConfigOptions (#4517)
    
    * Simplify
    
    * Fix environment variables
    
    * Tweaks
    
    * Update datafusion-cli
    
    * Format
    
    * Misc cleanup
    
    * Further fixes
    
    * Format
    
    * Update config.md
    
    * Flatten configuration
    
    * Document macro
---
 benchmarks/src/bin/h2o.rs                          |    8 +-
 datafusion-cli/src/main.rs                         |    2 +-
 datafusion/core/src/bin/print_config_docs.rs       |    4 +-
 datafusion/core/src/catalog/information_schema.rs  |   14 +-
 datafusion/core/src/config.rs                      | 1031 ++++++++++----------
 .../core/src/datasource/file_format/parquet.rs     |   14 +-
 datafusion/core/src/execution/context.rs           |  300 ++----
 .../core/src/physical_optimizer/enforcement.rs     |   12 +-
 .../core/src/physical_optimizer/join_selection.rs  |    9 +-
 .../core/src/physical_optimizer/repartition.rs     |    6 +-
 .../core/src/physical_plan/coalesce_batches.rs     |   16 +-
 .../core/src/physical_plan/file_format/parquet.rs  |   16 +-
 datafusion/core/src/physical_plan/planner.rs       |   22 +-
 datafusion/core/tests/config_from_env.rs           |   23 +-
 datafusion/core/tests/fifo.rs                      |    7 +-
 datafusion/core/tests/parquet/mod.rs               |    6 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   22 +-
 datafusion/core/tests/sql/mod.rs                   |   21 +-
 datafusion/core/tests/sql/set_variable.rs          |   12 +-
 .../test_files/information_schema.slt              |    3 +-
 docs/source/user-guide/configs.md                  |   59 +-
 parquet-test-utils/src/lib.rs                      |   14 +-
 22 files changed, 715 insertions(+), 906 deletions(-)

diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs
index 446210f68..c747cb0a9 100644
--- a/benchmarks/src/bin/h2o.rs
+++ b/benchmarks/src/bin/h2o.rs
@@ -18,12 +18,13 @@
 //! DataFusion h2o benchmarks
 
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::config::ConfigOptions;
 use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
 };
 use datafusion::datasource::MemTable;
-use datafusion::prelude::{CsvReadOptions, SessionConfig};
+use datafusion::prelude::CsvReadOptions;
 use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
 use std::path::PathBuf;
 use std::sync::Arc;
@@ -63,9 +64,10 @@ async fn main() -> Result<()> {
 
 async fn group_by(opt: &GroupBy) -> Result<()> {
     let path = opt.path.to_str().unwrap();
-    let config = SessionConfig::from_env().with_batch_size(65535);
+    let mut config = ConfigOptions::from_env()?;
+    config.execution.batch_size = 65535;
 
-    let ctx = SessionContext::with_config(config);
+    let ctx = SessionContext::with_config(config.into());
 
     let schema = Schema::new(vec![
         Field::new("id1", DataType::Utf8, false),
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index c4e198ff0..b9ada1a1f 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -96,7 +96,7 @@ pub async fn main() -> Result<()> {
         env::set_current_dir(p).unwrap();
     };
 
-    let mut session_config = SessionConfig::from_env().with_information_schema(true);
+    let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
 
     if let Some(batch_size) = args.batch_size {
         session_config = session_config.with_batch_size(batch_size);
diff --git a/datafusion/core/src/bin/print_config_docs.rs b/datafusion/core/src/bin/print_config_docs.rs
index 0a6415c62..f0390f2f6 100644
--- a/datafusion/core/src/bin/print_config_docs.rs
+++ b/datafusion/core/src/bin/print_config_docs.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::config::BuiltInConfigs;
+use datafusion::config::ConfigOptions;
 
 fn main() {
-    let docs = BuiltInConfigs::generate_config_markdown();
+    let docs = ConfigOptions::generate_config_markdown();
     println!("{docs}");
 }
diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs
index ec86acae3..cad20ebfe 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -27,7 +27,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 
-use crate::config::ConfigOptions;
+use crate::config::{ConfigEntry, ConfigOptions};
 use crate::datasource::streaming::{PartitionStream, StreamingTable};
 use crate::datasource::TableProvider;
 use crate::execution::context::TaskContext;
@@ -162,8 +162,8 @@ impl InformationSchemaConfig {
         config_options: &ConfigOptions,
         builder: &mut InformationSchemaDfSettingsBuilder,
     ) {
-        for (name, setting) in config_options.options() {
-            builder.add_setting(name, setting.to_string());
+        for entry in config_options.entries() {
+            builder.add_setting(entry);
         }
     }
 }
@@ -611,7 +611,7 @@ impl InformationSchemaDfSettings {
     fn new(config: InformationSchemaConfig) -> Self {
         let schema = Arc::new(Schema::new(vec![
             Field::new("name", DataType::Utf8, false),
-            Field::new("setting", DataType::Utf8, false),
+            Field::new("setting", DataType::Utf8, true),
         ]));
 
         Self { schema, config }
@@ -656,9 +656,9 @@ struct InformationSchemaDfSettingsBuilder {
 }
 
 impl InformationSchemaDfSettingsBuilder {
-    fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) {
-        self.names.append_value(name.as_ref());
-        self.settings.append_value(setting.as_ref());
+    fn add_setting(&mut self, entry: ConfigEntry) {
+        self.names.append_value(entry.key);
+        self.settings.append_option(entry.value);
     }
 
     fn finish(&mut self) -> RecordBatch {
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 508e4be50..4e6cddcd5 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -17,468 +17,364 @@
 
 //! DataFusion Configuration Options
 
-use arrow::datatypes::DataType;
-use datafusion_common::ScalarValue;
-use itertools::Itertools;
-use log::warn;
-use std::collections::{BTreeMap, HashMap};
-use std::env;
-use std::fmt::{Debug, Formatter};
-
-/*-************************************
-*  Catalog related
-**************************************/
-/// Configuration option "datafusion.catalog.create_default_catalog_and_schema"
-pub const OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA: &str =
-    "datafusion.catalog.create_default_catalog_and_schema";
-
-/// Configuration option "datafusion.catalog.information_schema"
-pub const OPT_INFORMATION_SCHEMA: &str = "datafusion.catalog.information_schema";
-
-/// Location scanned to load tables for `default` schema
-pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";
-
-/// Type of `TableProvider` to use when loading `default` schema
-pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";
-
-/*-************************************
-*  Execution related
-**************************************/
-/// Configuration option "datafusion.execution.batch_size"
-pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
-
-/// Configuration option "datafusion.execution.coalesce_batches"
-pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
-
-/// Configuration option "datafusion.execution.collect_statistics"
-pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics";
-
-/// Configuration option "datafusion.execution.target_partitions"
-pub const OPT_TARGET_PARTITIONS: &str = "datafusion.execution.target_partitions";
-
-/// Configuration option "datafusion.execution.time_zone"
-pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
-
-/*-************************************
-*  Execution parquet related
-**************************************/
-/// Configuration option "datafusion.execution.parquet.enable_page_index"
-pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
-    "datafusion.execution.parquet.enable_page_index";
-
-/// Configuration option "datafusion.execution.parquet.pruning"
-pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning";
-
-/// Configuration option "datafusion.execution.parquet.pushdown_filters"
-pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str =
-    "datafusion.execution.parquet.pushdown_filters";
-
-/// Configuration option "datafusion.execution.parquet.reorder_filters"
-pub const OPT_PARQUET_REORDER_FILTERS: &str =
-    "datafusion.execution.parquet.reorder_filters";
-
-/// Configuration option "datafusion.execution.parquet.skip_metadata"
-pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata";
-
-/// Configuration option "datafusion.execution.parquet.metadata_size_hint"
-pub const OPT_PARQUET_METADATA_SIZE_HINT: &str =
-    "datafusion.execution.parquet.metadata_size_hint";
-
-/*-************************************
-*  Explain related
-**************************************/
-/// Configuration option "datafusion.explain.logical_plan_only"
-pub const OPT_EXPLAIN_LOGICAL_PLAN_ONLY: &str = "datafusion.explain.logical_plan_only";
-
-/// Configuration option "datafusion.explain.physical_plan_only"
-pub const OPT_EXPLAIN_PHYSICAL_PLAN_ONLY: &str = "datafusion.explain.physical_plan_only";
-
-/*-************************************
-*  Optimizer related
-**************************************/
-/// Configuration option "datafusion.optimizer.filter_null_join_keys"
-pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
-
-/// Configuration option "datafusion.optimizer.repartition_aggregations"
-pub const OPT_REPARTITION_AGGREGATIONS: &str =
-    "datafusion.optimizer.repartition_aggregations";
-
-/// Configuration option "datafusion.optimizer.repartition_joins"
-pub const OPT_REPARTITION_JOINS: &str = "datafusion.optimizer.repartition_joins";
-
-/// Configuration option "datafusion.optimizer.repartition_windows"
-pub const OPT_REPARTITION_WINDOWS: &str = "datafusion.optimizer.repartition_windows";
-
-/// Configuration option "datafusion.optimizer.skip_failed_rules"
-pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
-    "datafusion.optimizer.skip_failed_rules";
-
-/// Configuration option "datafusion.optimizer.max_passes"
-pub const OPT_OPTIMIZER_MAX_PASSES: &str = "datafusion.optimizer.max_passes";
-
-/// Configuration option "datafusion.optimizer.top_down_join_key_reordering"
-pub const OPT_TOP_DOWN_JOIN_KEY_REORDERING: &str =
-    "datafusion.optimizer.top_down_join_key_reordering";
-
-/// Configuration option "datafusion.optimizer.prefer_hash_join"
-pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
-
-/// Configuration option "atafusion.optimizer.hash_join_single_partition_threshold"
-pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
-    "datafusion.optimizer.hash_join_single_partition_threshold";
-
-/// Configuration option "datafusion.execution.round_robin_repartition"
-pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str =
-    "datafusion.optimizer.enable_round_robin_repartition";
+use datafusion_common::{DataFusionError, Result};
+use std::any::Any;
+use std::collections::BTreeMap;
+use std::fmt::Display;
+
+/// A macro that wraps a configuration struct and automatically derives
+/// [`Default`] and [`ConfigField`] for it, allowing it to be used
+/// in the [`ConfigOptions`] configuration tree
+///
+/// For example,
+///
+/// ```ignore
+/// config_namespace! {
+///    /// Amazing config
+///    pub struct MyConfig {
+///        /// Field 1 doc
+///        field1: String, default = "".to_string()
+///
+///        /// Field 2 doc
+///        field2: usize, default = 232
+///
+///        /// Field 3 doc
+///        field3: Option<usize>, default = None
+///    }
+///}
+/// ```
+///
+/// Will generate
+///
+/// ```ignore
+/// /// Amazing config
+/// #[derive(Debug, Clone)]
+/// #[non_exhaustive]
+/// pub struct MyConfig {
+///     /// Field 1 doc
+///     field1: String,
+///     /// Field 2 doc
+///     field2: usize,
+///     /// Field 3 doc
+///     field3: Option<usize>,
+/// }
+/// impl ConfigField for MyConfig {
+///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
+///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+///         match key {
+///             "field1" => self.field1.set(rem, value),
+///             "field2" => self.field2.set(rem, value),
+///             "field3" => self.field3.set(rem, value),
+///             _ => Err(DataFusionError::Internal(format!(
+///                 "Config value \"{}\" not found on MyConfig",
+///                 key
+///             ))),
+///         }
+///     }
+///
+///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
+///         let key = format!("{}.field1", key_prefix);
+///         let desc = "Field 1 doc";
+///         self.field1.visit(v, key.as_str(), desc);
+///         let key = format!("{}.field2", key_prefix);
+///         let desc = "Field 2 doc";
+///         self.field2.visit(v, key.as_str(), desc);
+///         let key = format!("{}.field3", key_prefix);
+///         let desc = "Field 3 doc";
+///         self.field3.visit(v, key.as_str(), desc);
+///     }
+/// }
+///
+/// impl Default for MyConfig {
+///     fn default() -> Self {
+///         Self {
+///             field1: "".to_string(),
+///             field2: 232,
+///             field3: None,
+///         }
+///     }
+/// }
+/// ```
+///
+/// NB: Misplaced commas may result in nonsensical errors
+///
+macro_rules! config_namespace {
+    (
+     $(#[doc = $struct_d:tt])*
+     $vis:vis struct $struct_name:ident {
+        $(
+        $(#[doc = $d:tt])*
+        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
+        )*$(,)*
+    }
+    ) => {
+
+        $(#[doc = $struct_d])*
+        #[derive(Debug, Clone)]
+        #[non_exhaustive]
+        $vis struct $struct_name{
+            $(
+            $(#[doc = $d])*
+            $field_vis $field_name : $field_type,
+            )*
+        }
 
-/// Definition of a configuration option
-pub struct ConfigDefinition {
-    /// key used to identifier this configuration option
-    key: String,
-    /// Description to be used in generated documentation
-    description: String,
-    /// Data type of this option
-    data_type: DataType,
-    /// Default value
-    default_value: ScalarValue,
-}
+        impl ConfigField for $struct_name {
+            fn set(&mut self, key: &str, value: &str) -> Result<()> {
+                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+                match key {
+                    $(
+                       stringify!($field_name) => self.$field_name.set(rem, value),
+                    )*
+                    _ => Err(DataFusionError::Internal(
+                        format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
+                    ))
+                }
+            }
 
-macro_rules! get_conf_value {
-    ($SELF: expr, $TPE: ident, $KEY: expr, $TPE_NAME: expr) => {
-        match $SELF.get($KEY) {
-            Some(ScalarValue::$TPE(v)) => v,
-            Some(v) => {
-                warn!(
-                    "Config type mismatch for {}. Expected: {}, got: {:?}",
-                    $KEY, $TPE_NAME, &v
-                );
-                None
+            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
+                $(
+                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
+                let desc = concat!($($d),*).trim();
+                self.$field_name.visit(v, key.as_str(), desc);
+                )*
             }
-            None => None,
         }
-    };
-}
 
-impl ConfigDefinition {
-    /// Create a configuration option definition
-    pub fn new(
-        name: impl Into<String>,
-        description: impl Into<String>,
-        data_type: DataType,
-        default_value: ScalarValue,
-    ) -> Self {
-        Self {
-            key: name.into(),
-            description: description.into(),
-            data_type,
-            default_value,
+        impl Default for $struct_name {
+            fn default() -> Self {
+                Self {
+                    $($field_name: $default),*
+                }
+            }
         }
     }
+}
 
-    /// Create a configuration option definition with a boolean value
-    pub fn new_bool(
-        key: impl Into<String>,
-        description: impl Into<String>,
-        default_value: bool,
-    ) -> Self {
-        Self::new(
-            key,
-            description,
-            DataType::Boolean,
-            ScalarValue::Boolean(Some(default_value)),
-        )
-    }
+config_namespace! {
+    /// Options related to catalog and directory scanning
+    pub struct CatalogOptions {
+        /// Number of partitions for query execution. Increasing partitions can increase
+        /// concurrency. Defaults to the number of cpu cores on the system.
+        pub create_default_catalog_and_schema: bool, default = true
 
-    /// Create a configuration option definition with a u64 value
-    pub fn new_u64(
-        key: impl Into<String>,
-        description: impl Into<String>,
-        default_value: u64,
-    ) -> Self {
-        Self::new(
-            key,
-            description,
-            DataType::UInt64,
-            ScalarValue::UInt64(Some(default_value)),
-        )
-    }
+        /// Should DataFusion provide access to `information_schema`
+        /// virtual tables for displaying schema information
+        pub information_schema: bool, default = false
 
-    /// Create a configuration option definition with a string value
-    pub fn new_string(
-        key: impl Into<String>,
-        description: impl Into<String>,
-        default_value: Option<String>,
-    ) -> Self {
-        Self::new(
-            key,
-            description,
-            DataType::Utf8,
-            ScalarValue::Utf8(default_value),
-        )
+        /// Location scanned to load tables for `default` schema
+        pub location: Option<String>, default = None
+
+        /// Type of `TableProvider` to use when loading `default` schema
+        pub format: Option<String>, default = None
+
+        /// If the file has a header
+        pub has_header: bool, default = false
     }
 }
 
-/// Contains definitions for all built-in configuration options
-pub struct BuiltInConfigs {
-    /// Configuration option definitions
-    config_definitions: Vec<ConfigDefinition>,
+config_namespace! {
+    /// Options related to query execution
+    pub struct ExecutionOptions {
+        /// Default batch size while creating new batches, it's especially useful for
+        /// buffer-in-memory batches since creating tiny batches would results in too much
+        /// metadata memory consumption
+        pub batch_size: usize, default = 8192
+
+        /// When set to true, record batches will be examined between each operator and
+        /// small batches will be coalesced into larger batches. This is helpful when there
+        /// are highly selective filters or joins that could produce tiny output batches. The
+        /// target batch size is determined by the configuration setting
+        pub coalesce_batches: bool, default = true
+
+        /// Should DataFusion collect statistics after listing files
+        pub collect_statistics: bool, default = false
+
+        /// Number of partitions for query execution. Increasing partitions can increase
+        /// concurrency. Defaults to the number of cpu cores on the system
+        pub target_partitions: usize, default = num_cpus::get()
+
+        /// The default time zone
+        ///
+        /// Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime
+        /// according to this time zone, and then extract the hour
+        pub time_zone: Option<String>, default = Some("+00:00".into())
+
+        /// Parquet options
+        pub parquet: ParquetOptions, default = Default::default()
+    }
 }
 
-impl Default for BuiltInConfigs {
-    fn default() -> Self {
-        Self::new()
+config_namespace! {
+    /// Options related to reading of parquet files
+    pub struct ParquetOptions {
+        /// If true, uses parquet data page level metadata (Page Index) statistics
+        /// to reduce the number of rows decoded.
+        pub enable_page_index: bool, default = false
+
+        /// If true, the parquet reader attempts to skip entire row groups based
+        /// on the predicate in the query and the metadata (min/max values) stored in
+        /// the parquet file
+        pub pruning: bool, default = true
+
+        /// If true, the parquet reader skip the optional embedded metadata that may be in
+        /// the file Schema. This setting can help avoid schema conflicts when querying
+        /// multiple parquet files with schemas containing compatible types but different metadata
+        pub skip_metadata: bool, default = true
+
+        /// If specified, the parquet reader will try and fetch the last `size_hint`
+        /// bytes of the parquet file optimistically. If not specified, two read are required:
+        /// One read to fetch the 8-byte parquet footer and
+        /// another to fetch the metadata length encoded in the footer
+        pub metadata_size_hint: Option<usize>, default = None
+
+        /// If true, filter expressions are be applied during the parquet decoding operation to
+        /// reduce the number of rows decoded
+        pub pushdown_filters: bool, default = false
+
+        /// If true, filter expressions evaluated during the parquet decoding operation
+        /// will be reordered heuristically to minimize the cost of evaluation. If false,
+        /// the filters are applied in the same order as written in the query
+        pub reorder_filters: bool, default = false
     }
 }
 
-impl BuiltInConfigs {
-    /// Create a new BuiltInConfigs struct containing definitions for all built-in
-    /// configuration options
-    pub fn new() -> Self {
-        Self {
-            config_definitions: vec![ConfigDefinition::new_u64(
-                OPT_TARGET_PARTITIONS,
-                "Number of partitions for query execution. Increasing partitions can increase \
-                 concurrency. Defaults to the number of cpu cores on the system.",
-                num_cpus::get() as u64,
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
-                "Whether the default catalog and schema should be created automatically.",
-                true
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_INFORMATION_SCHEMA,
-                "Should DataFusion provide access to `information_schema` \
-                 virtual tables for displaying schema information",
-                false
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_REPARTITION_JOINS,
-                "Should DataFusion repartition data using the join keys to execute joins in parallel \
-                 using the provided `target_partitions` level",
-                true
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_REPARTITION_AGGREGATIONS,
-                "Should DataFusion repartition data using the aggregate keys to execute aggregates \
-                 in parallel using the provided `target_partitions` level",
-                true
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_REPARTITION_WINDOWS,
-                "Should DataFusion repartition data using the partitions keys to execute window \
-                 functions in parallel using the provided `target_partitions` level",
-                true
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_COLLECT_STATISTICS,
-                "Should DataFusion collect statistics after listing files",
-                false
-            ),
-
-            ConfigDefinition::new_bool(
-                OPT_FILTER_NULL_JOIN_KEYS,
-                "When set to true, the optimizer will insert filters before a join between \
-                a nullable and non-nullable column to filter out nulls on the nullable side. This \
-                filter can add additional overhead when the file format does not fully support \
-                predicate push down.",
-                false,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_EXPLAIN_LOGICAL_PLAN_ONLY,
-                "When set to true, the explain statement will only print logical plans.",
-                false,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_EXPLAIN_PHYSICAL_PLAN_ONLY,
-                "When set to true, the explain statement will only print physical plans.",
-                false,
-            ),
-            ConfigDefinition::new_u64(
-                OPT_BATCH_SIZE,
-                "Default batch size while creating new batches, it's especially useful for \
-                 buffer-in-memory batches since creating tiny batches would results in too much metadata \
-                 memory consumption.",
-                8192,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_COALESCE_BATCHES,
-                format!("When set to true, record batches will be examined between each operator and \
-                small batches will be coalesced into larger batches. This is helpful when there \
-                are highly selective filters or joins that could produce tiny output batches. The \
-                target batch size is determined by the configuration setting \
-                '{}'.", OPT_BATCH_SIZE),
-                true,
-            ),
-            ConfigDefinition::new_string(
-                OPT_TIME_ZONE,
-                "The session time zone which some function require \
-                e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, \
-                then extract the hour.",
-                Some("+00:00".into()),
-            ),
-            ConfigDefinition::new_bool(
-                OPT_PARQUET_PUSHDOWN_FILTERS,
-                "If true, filter expressions are be applied during the parquet decoding operation to \
-                 reduce the number of rows decoded.",
-                false,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_PARQUET_REORDER_FILTERS,
-                "If true, filter expressions evaluated during the parquet decoding opearation \
-                 will be reordered heuristically to minimize the cost of evaluation. If false, \
-                 the filters are applied in the same order as written in the query.",
-                false,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_PARQUET_ENABLE_PAGE_INDEX,
-                "If true, uses parquet data page level metadata (Page Index) statistics \
-                 to reduce the number of rows decoded.",
-                false,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_PARQUET_ENABLE_PRUNING,
-                "If true, the parquet reader attempts to skip entire row groups based \
-                 on the predicate in the query and the metadata (min/max values) stored in \
-                 the parquet file.",
-                true,
-            ),
-            ConfigDefinition::new_bool(
-                OPT_PARQUET_SKIP_METADATA,
-                "If true, the parquet reader skip the optional embedded metadata that may be in \
-                the file Schema. This setting can help avoid schema conflicts when querying \
-                multiple parquet files with schemas containing compatible types but different metadata.",
-                true,
-            ),
-            ConfigDefinition::new(
-                OPT_PARQUET_METADATA_SIZE_HINT,
-                "If specified, the parquet reader will try and fetch the last `size_hint` \
-                 bytes of the parquet file optimistically. If not specified, two read are required: \
-                 One read to fetch the 8-byte parquet footer and  \
-                 another to fetch the metadata length encoded in the footer.",
-                DataType::UInt64,
-                ScalarValue::UInt64(None),
-            ),
-            ConfigDefinition::new_bool(
-                OPT_OPTIMIZER_SKIP_FAILED_RULES,
-                "When set to true, the logical plan optimizer will produce warning \
-                messages if any optimization rules produce errors and then proceed to the next \
-                rule. When set to false, any rules that produce errors will cause the query to fail.",
-                true
-            ),
-            ConfigDefinition::new_u64(
-                OPT_OPTIMIZER_MAX_PASSES,
-                "Number of times that the optimizer will attempt to optimize the plan",
-                3
-            ),
-            ConfigDefinition::new_string(
-                OPT_CATALOG_LOCATION,
-                "Location scanned to load tables for `default` schema, defaults to None",
-                None,
-            ),
-            ConfigDefinition::new_string(
-                OPT_CATALOG_TYPE,
-                "Type of `TableProvider` to use when loading `default` schema. Defaults to None",
-                None,
-            ),
-             ConfigDefinition::new_bool(
-                 OPT_TOP_DOWN_JOIN_KEY_REORDERING,
-                 "When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true",
-                 true,
-             ),
-             ConfigDefinition::new_bool(
-                 OPT_PREFER_HASH_JOIN,
-                 "When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently\
-                 than SortMergeJoin but consumes more memory. Defaults to true",
-                 true,
-             ),
-             ConfigDefinition::new_u64(
-                 OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD,
-                 "The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
-                 1024 * 1024,
-             ),
-             ConfigDefinition::new_bool(
-                 OPT_ENABLE_ROUND_ROBIN_REPARTITION,
-                 "When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores",
-                 true,
-             ),
-            ]
-        }
+config_namespace! {
+    /// Options related to query optimization
+    pub struct OptimizerOptions {
+        /// When set to true, the physical plan optimizer will try to add round robin
+        /// repartition to increase parallelism to leverage more CPU cores
+        pub enable_round_robin_repartition: bool, default = true
+
+        /// When set to true, the optimizer will insert filters before a join between
+        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
+        /// filter can add additional overhead when the file format does not fully support
+        /// predicate push down.
+        pub filter_null_join_keys: bool, default = false
+
+        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
+        /// in parallel using the provided `target_partitions` level"
+        pub repartition_aggregations: bool, default = true
+
+        /// Should DataFusion repartition data using the join keys to execute joins in parallel
+        /// using the provided `target_partitions` level"
+        pub repartition_joins: bool, default = true
+
+        /// Should DataFusion repartition data using the partitions keys to execute window
+        /// functions in parallel using the provided `target_partitions` level"
+        pub repartition_windows: bool, default = true
+
+        /// When set to true, the logical plan optimizer will produce warning
+        /// messages if any optimization rules produce errors and then proceed to the next
+        /// rule. When set to false, any rules that produce errors will cause the query to fail
+        pub skip_failed_rules: bool, default = true
+
+        /// Number of times that the optimizer will attempt to optimize the plan
+        pub max_passes: usize, default = 3
+
+        /// When set to true, the physical plan optimizer will run a top down
+        /// process to reorder the join keys
+        pub top_down_join_key_reordering: bool, default = true
+
+        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
+        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
+        pub prefer_hash_join: bool, default = true
+
+        /// The maximum estimated size in bytes for one input side of a HashJoin
+        /// will be collected into a single partition
+        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
     }
+}
 
-    /// Generate documentation that can be included in the user guide
-    pub fn generate_config_markdown() -> String {
-        use std::fmt::Write as _;
-        let configs = Self::new();
-        let mut docs = "| key | type | default | description |\n".to_string();
-        docs += "|-----|------|---------|-------------|\n";
-
-        let config_definitions: Vec<_> = configs
-            .config_definitions
-            .into_iter()
-            .map(normalize_for_display)
-            .collect();
+config_namespace! {
+    /// Options controlling explain output
+    pub struct ExplainOptions {
+        /// When set to true, the explain statement will only print logical plans
+        pub logical_plan_only: bool, default = false
 
-        for config in config_definitions.iter().sorted_by_key(|c| c.key.as_str()) {
-            let _ = writeln!(
-                &mut docs,
-                "| {} | {} | {} | {} |",
-                config.key, config.data_type, config.default_value, config.description
-            );
-        }
-        docs
+        /// When set to true, the explain statement will only print physical plans
+        pub physical_plan_only: bool, default = false
     }
 }
 
-/// Normalizes a config definition prior to markdown display
-fn normalize_for_display(mut v: ConfigDefinition) -> ConfigDefinition {
-    // Since the default value of target_partitions depends on the number of cores,
-    // set the default value to 0 in the docs.
-    if v.key == OPT_TARGET_PARTITIONS {
-        v.default_value = ScalarValue::UInt64(Some(0))
-    }
-    v
+/// A key value pair, with a corresponding description
+#[derive(Debug)]
+pub struct ConfigEntry {
+    /// A unique string to identify this config value
+    pub key: String,
+
+    /// The value if any
+    pub value: Option<String>,
+
+    /// A description of this configuration entry
+    pub description: &'static str,
 }
 
-/// Configuration options struct. This can contain values for built-in and custom options
-#[derive(Clone)]
+/// Configuration options struct, able to store both built-in configuration and custom options
+#[derive(Debug, Clone, Default)]
+#[non_exhaustive]
 pub struct ConfigOptions {
-    options: HashMap<String, ScalarValue>,
+    /// Catalog options
+    pub catalog: CatalogOptions,
+    /// Execution options
+    pub execution: ExecutionOptions,
+    /// Explain options
+    pub optimizer: OptimizerOptions,
+    /// Explain options
+    pub explain: ExplainOptions,
+    /// Optional extensions registered using [`Extensions::insert`]
+    pub extensions: Extensions,
 }
 
-/// Print the configurations in an ordered way so that we can directly compare the equality of two ConfigOptions by their debug strings
-impl Debug for ConfigOptions {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("ConfigOptions")
-            .field(
-                "options",
-                &format!("{:?}", BTreeMap::from_iter(self.options.iter())),
-            )
-            .finish()
+impl ConfigField for ConfigOptions {
+    fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        // Extensions are handled in the public `ConfigOptions::set`
+        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+        match key {
+            "catalog" => self.catalog.set(rem, value),
+            "execution" => self.execution.set(rem, value),
+            "optimizer" => self.optimizer.set(rem, value),
+            "explain" => self.explain.set(rem, value),
+            _ => Err(DataFusionError::Internal(format!(
+                "Config value \"{}\" not found on ConfigOptions",
+                key
+            ))),
+        }
     }
-}
 
-impl Default for ConfigOptions {
-    fn default() -> Self {
-        Self::new()
+    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
+        self.catalog.visit(v, "datafusion.catalog", "");
+        self.execution.visit(v, "datafusion.execution", "");
+        self.optimizer.visit(v, "datafusion.optimizer", "");
+        self.explain.visit(v, "datafusion.explain", "");
     }
 }
 
 impl ConfigOptions {
-    /// Create new ConfigOptions struct
+    /// Creates a new [`ConfigOptions`] with default values
     pub fn new() -> Self {
-        let built_in = BuiltInConfigs::new();
-        let mut options = HashMap::with_capacity(built_in.config_definitions.len());
-        for config_def in &built_in.config_definitions {
-            options.insert(config_def.key.clone(), config_def.default_value.clone());
+        Self::default()
+    }
+
+    /// Set a configuration option
+    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        let (prefix, key) = key.split_once('.').ok_or_else(|| {
+            DataFusionError::External(
+                format!("could not find config namespace for key \"{key}\"",).into(),
+            )
+        })?;
+
+        if prefix == "datafusion" {
+            return ConfigField::set(self, key, value);
         }
-        Self { options }
+
+        let e = self.extensions.0.get_mut(prefix);
+        let e = e.ok_or_else(|| {
+            DataFusionError::External(
+                format!("Could not find config namespace \"{prefix}\"",).into(),
+            )
+        })?;
+        e.0.set(key, value)
     }
 
     /// Create new ConfigOptions struct, taking values from
@@ -486,137 +382,214 @@ impl ConfigOptions {
     ///
     /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
     /// control `datafusion.execution.batch_size`.
-    pub fn from_env() -> Self {
-        let built_in = BuiltInConfigs::new();
-        let mut options = HashMap::with_capacity(built_in.config_definitions.len());
-        for config_def in &built_in.config_definitions {
-            let config_value = {
-                let mut env_key = config_def.key.replace('.', "_");
-                env_key.make_ascii_uppercase();
-                match env::var(&env_key) {
-                    Ok(value) => match ScalarValue::try_from_string(
-                        value.clone(),
-                        &config_def.data_type,
-                    ) {
-                        Ok(parsed) => parsed,
-                        Err(_) => {
-                            warn!("Warning: could not parse environment variable {}={} to type {}.", env_key, value, config_def.data_type);
-                            config_def.default_value.clone()
-                        }
-                    },
-                    Err(_) => config_def.default_value.clone(),
-                }
-            };
-            options.insert(config_def.key.clone(), config_value);
+    pub fn from_env() -> Result<Self> {
+        struct Visitor(Vec<String>);
+
+        impl Visit for Visitor {
+            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
+                self.0.push(key.to_string())
+            }
+
+            fn none(&mut self, key: &str, _: &'static str) {
+                self.0.push(key.to_string())
+            }
         }
-        Self { options }
-    }
 
-    /// set a configuration option
-    pub fn set(&mut self, key: &str, value: ScalarValue) {
-        self.options.insert(key.to_string(), value);
-    }
+        // Extract the names of all fields and then look up the corresponding
+        // environment variables. This isn't hugely efficient but avoids
+        // ambiguity between `a.b` and `a_b` which would both correspond
+        // to an environment variable of `A_B`
 
-    /// set a boolean configuration option
-    pub fn set_bool(&mut self, key: &str, value: bool) {
-        self.set(key, ScalarValue::Boolean(Some(value)))
-    }
+        let mut keys = Visitor(vec![]);
+        let mut ret = Self::default();
+        ret.visit(&mut keys, "datafusion", "");
 
-    /// set a `u64` configuration option
-    pub fn set_u64(&mut self, key: &str, value: u64) {
-        self.set(key, ScalarValue::UInt64(Some(value)))
-    }
+        for key in keys.0 {
+            let env = key.to_uppercase().replace('.', "_");
+            if let Some(var) = std::env::var_os(env) {
+                ret.set(&key, var.to_string_lossy().as_ref())?;
+            }
+        }
 
-    /// set a `usize` configuration option
-    pub fn set_usize(&mut self, key: &str, value: usize) {
-        let value: u64 = value.try_into().expect("convert u64 to usize");
-        self.set(key, ScalarValue::UInt64(Some(value)))
+        Ok(ret)
     }
 
-    /// set a `String` configuration option
-    pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
-        self.set(key, ScalarValue::Utf8(Some(value.into())))
-    }
+    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
+    pub fn entries(&self) -> Vec<ConfigEntry> {
+        struct Visitor(Vec<ConfigEntry>);
+
+        impl Visit for Visitor {
+            fn some<V: Display>(
+                &mut self,
+                key: &str,
+                value: V,
+                description: &'static str,
+            ) {
+                self.0.push(ConfigEntry {
+                    key: key.to_string(),
+                    value: Some(value.to_string()),
+                    description,
+                })
+            }
 
-    /// get a configuration option
-    pub fn get(&self, key: &str) -> Option<ScalarValue> {
-        self.options.get(key).cloned()
-    }
+            fn none(&mut self, key: &str, description: &'static str) {
+                self.0.push(ConfigEntry {
+                    key: key.to_string(),
+                    value: None,
+                    description,
+                })
+            }
+        }
+
+        let mut v = Visitor(vec![]);
+        self.visit(&mut v, "datafusion", "");
 
-    /// get a boolean configuration option
-    pub fn get_bool(&self, key: &str) -> Option<bool> {
-        get_conf_value!(self, Boolean, key, "bool")
+        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
+        v.0
     }
 
-    /// get a u64 configuration option
-    pub fn get_u64(&self, key: &str) -> Option<u64> {
-        get_conf_value!(self, UInt64, key, "u64")
+    /// Generate documentation that can be included in the user guide
+    pub fn generate_config_markdown() -> String {
+        use std::fmt::Write as _;
+
+        let mut s = Self::default();
+        s.execution.target_partitions = 0; // Normalize for display
+
+        let mut docs = "| key | default | description |\n".to_string();
+        docs += "|-----|---------|-------------|\n";
+        let mut entries = s.entries();
+        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
+
+        for entry in s.entries() {
+            let _ = writeln!(
+                &mut docs,
+                "| {} | {} | {} |",
+                entry.key,
+                entry.value.as_deref().unwrap_or("NULL"),
+                entry.description
+            );
+        }
+        docs
     }
+}
+
+/// [`ConfigExtension`] provides a mechanism to store third-party configuration within DataFusion
+///
+/// Unfortunately associated constants are not currently object-safe, and so this
+/// extends the object-safe [`ExtensionOptions`]
+pub trait ConfigExtension: ExtensionOptions {
+    /// Configuration namespace prefix to use
+    ///
+    /// All values under this will be prefixed with `$PREFIX + "."`
+    const PREFIX: &'static str;
+}
+
+/// An object-safe API for storing arbitrary configuration
+pub trait ExtensionOptions: Send + Sync + std::fmt::Debug + 'static {
+    /// Return `self` as [`Any`]
+    ///
+    /// This is needed until trait upcasting is stabilised
+    fn as_any(&self) -> &dyn Any;
+
+    /// Return `self` as [`Any`]
+    ///
+    /// This is needed until trait upcasting is stabilised
+    fn as_any_mut(&mut self) -> &mut dyn Any;
+
+    /// Return a deep clone of this [`ExtensionOptions`]
+    ///
+    /// It is important this does not share mutable state to avoid consistency issues
+    /// with configuration changing whilst queries are executing
+    fn cloned(&self) -> Box<dyn ExtensionOptions>;
+
+    /// Set the given `key`, `value` pair
+    fn set(&mut self, key: &str, value: &str) -> Result<()>;
 
-    /// get a u64 configuration option as a usize
-    pub fn get_usize(&self, key: &str) -> Option<usize> {
-        let v = get_conf_value!(self, UInt64, key, "usize");
-        v.and_then(|v| v.try_into().ok())
+    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
+    fn entries(&self) -> Vec<ConfigEntry>;
+}
+
+/// A type-safe container for [`ConfigExtension`]
+#[derive(Debug, Default, Clone)]
+pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
+
+impl Extensions {
+    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
+    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
+        assert_ne!(T::PREFIX, "datafusion");
+        let e = ExtensionBox(Box::new(extension));
+        self.0.insert(T::PREFIX, e);
     }
 
-    /// get a string configuration option
-    pub fn get_string(&self, key: &str) -> Option<String> {
-        get_conf_value!(self, Utf8, key, "string")
+    /// Retrieves the extension of the given type if any
+    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
+        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
     }
 
-    /// Access the underlying hashmap
-    pub fn options(&self) -> &HashMap<String, ScalarValue> {
-        &self.options
+    /// Retrieves the extension of the given type if any
+    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
+        let e = self.0.get_mut(T::PREFIX)?;
+        e.0.as_any_mut().downcast_mut()
     }
+}
+
+#[derive(Debug)]
+struct ExtensionBox(Box<dyn ExtensionOptions>);
 
-    /// Tests if the key exists in the configuration
-    pub fn exists(&self, key: &str) -> bool {
-        self.options().contains_key(key)
+impl Clone for ExtensionBox {
+    fn clone(&self) -> Self {
+        Self(self.0.cloned())
     }
 }
 
-#[cfg(test)]
-mod test {
-    use crate::config::{BuiltInConfigs, ConfigOptions};
-
-    #[test]
-    fn docs() {
-        let docs = BuiltInConfigs::generate_config_markdown();
-        let mut lines = docs.lines();
-        assert_eq!(
-            lines.next().unwrap(),
-            "| key | type | default | description |"
-        );
-        let configs = BuiltInConfigs::default();
-        for config in configs.config_definitions {
-            assert!(docs.contains(&config.key));
+/// A trait implemented by `config_namespace` and for field types that provides
+/// the ability to walk and mutate the configuration tree
+trait ConfigField {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
+
+    fn set(&mut self, key: &str, value: &str) -> Result<()>;
+}
+
+impl<F: ConfigField + Default> ConfigField for Option<F> {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
+        match self {
+            Some(s) => s.visit(v, key, description),
+            None => v.none(key, description),
         }
     }
 
-    #[test]
-    fn get_then_set() {
-        let mut config = ConfigOptions::new();
-        let config_key = "datafusion.optimizer.filter_null_join_keys";
-        assert!(!config.get_bool(config_key).unwrap_or_default());
-        config.set_bool(config_key, true);
-        assert!(config.get_bool(config_key).unwrap_or_default());
+    fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        self.get_or_insert_with(Default::default).set(key, value)
     }
+}
 
-    #[test]
-    fn get_invalid_config() {
-        let config = ConfigOptions::new();
-        let invalid_key = "not.valid";
-        assert!(!config.exists(invalid_key));
-        assert!(!config.get_bool(invalid_key).unwrap_or_default());
-    }
+macro_rules! config_field {
+    ($t:ty) => {
+        impl ConfigField for $t {
+            fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
+                v.some(key, self, description)
+            }
+
+            fn set(&mut self, _: &str, value: &str) -> Result<()> {
+                *self = value.parse().map_err(|e| {
+                    DataFusionError::Context(
+                        format!(concat!("Error parsing {} as ", stringify!($t),), value),
+                        Box::new(DataFusionError::External(Box::new(e))),
+                    )
+                })?;
+                Ok(())
+            }
+        }
+    };
+}
 
-    #[test]
-    fn get_config_in_invalid_format() {
-        let config = ConfigOptions::new();
-        let key = "datafusion.execution.batch_size";
+config_field!(String);
+config_field!(bool);
+config_field!(usize);
 
-        assert!(config.exists(key));
-        assert_eq!(None, config.get_string(key));
-        assert!(!config.get_bool(key).unwrap_or_default());
-    }
+/// An implementation trait used to recursively walk configuration
+trait Visit {
+    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
+
+    fn none(&mut self, key: &str, description: &'static str);
 }
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index cc9661e95..0d577cecf 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -40,9 +40,7 @@ use crate::arrow::array::{
 };
 use crate::arrow::datatypes::{DataType, Field};
 use crate::config::ConfigOptions;
-use crate::config::OPT_PARQUET_ENABLE_PRUNING;
-use crate::config::OPT_PARQUET_METADATA_SIZE_HINT;
-use crate::config::OPT_PARQUET_SKIP_METADATA;
+
 use crate::datasource::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
 use crate::execution::context::SessionState;
@@ -87,8 +85,7 @@ impl ParquetFormat {
     /// Return true if pruning is enabled
     pub fn enable_pruning(&self, config_options: &ConfigOptions) -> bool {
         self.enable_pruning
-            .or_else(|| config_options.get_bool(OPT_PARQUET_ENABLE_PRUNING))
-            .unwrap_or(true)
+            .unwrap_or(config_options.execution.parquet.pruning)
     }
 
     /// Provide a hint to the size of the file metadata. If a hint is provided
@@ -104,8 +101,8 @@ impl ParquetFormat {
 
     /// Return the metadata size hint if set
     pub fn metadata_size_hint(&self, config_options: &ConfigOptions) -> Option<usize> {
-        self.metadata_size_hint
-            .or_else(|| config_options.get_usize(OPT_PARQUET_METADATA_SIZE_HINT))
+        let hint = config_options.execution.parquet.metadata_size_hint;
+        self.metadata_size_hint.or(hint)
     }
 
     /// Tell the parquet reader to skip any metadata that may be in
@@ -122,8 +119,7 @@ impl ParquetFormat {
     /// schema merging.
     pub fn skip_metadata(&self, config_options: &ConfigOptions) -> bool {
         self.skip_metadata
-            .or_else(|| config_options.get_bool(OPT_PARQUET_SKIP_METADATA))
-            .unwrap_or(true)
+            .unwrap_or(config_options.execution.parquet.skip_metadata)
     }
 }
 
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 4a49e8ec7..1e52c84ac 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -18,11 +18,6 @@
 //! SessionContext contains methods for registering data sources and executing queries
 use crate::{
     catalog::catalog::{CatalogList, MemoryCatalogList},
-    config::{
-        OPT_COLLECT_STATISTICS, OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
-        OPT_INFORMATION_SCHEMA, OPT_PARQUET_ENABLE_PRUNING, OPT_REPARTITION_AGGREGATIONS,
-        OPT_REPARTITION_JOINS, OPT_REPARTITION_WINDOWS, OPT_TARGET_PARTITIONS,
-    },
     datasource::listing::{ListingOptions, ListingTable},
     datasource::{MemTable, ViewTable},
     logical_expr::{PlanType, ToStringifiedPlan},
@@ -35,7 +30,6 @@ use crate::{
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::var_provider::is_system_variables;
 use parking_lot::RwLock;
-use std::str::FromStr;
 use std::sync::Arc;
 use std::{
     any::{Any, TypeId},
@@ -71,11 +65,7 @@ use datafusion_sql::{ResolvedTableReference, TableReference};
 use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::repartition::Repartition;
 
-use crate::config::{
-    ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES,
-    OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
-    OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
-};
+use crate::config::ConfigOptions;
 use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
 use crate::physical_optimizer::enforcement::BasicEnforcement;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
@@ -374,48 +364,8 @@ impl SessionContext {
                 variable, value, ..
             }) => {
                 let mut state = self.state.write();
-                let config_options = &mut state.config.config_options;
-
-                let old_value = config_options.get(&variable).ok_or_else(|| {
-                    DataFusionError::Execution(format!(
-                        "Can not SET variable: Unknown Variable {variable}"
-                    ))
-                })?;
-
-                match old_value {
-                    ScalarValue::Boolean(_) => {
-                        let new_value = value.parse::<bool>().map_err(|_| {
-                            DataFusionError::Execution(format!(
-                                "Failed to parse {value} as bool",
-                            ))
-                        })?;
-                        config_options.set_bool(&variable, new_value);
-                    }
-
-                    ScalarValue::UInt64(_) => {
-                        let new_value = value.parse::<u64>().map_err(|_| {
-                            DataFusionError::Execution(format!(
-                                "Failed to parse {value} as u64",
-                            ))
-                        })?;
-                        config_options.set_u64(&variable, new_value);
-                    }
-
-                    ScalarValue::Utf8(_) => {
-                        let new_value = value.parse::<String>().map_err(|_| {
-                            DataFusionError::Execution(format!(
-                                "Failed to parse {value} as String",
-                            ))
-                        })?;
-                        config_options.set_string(&variable, new_value);
-                    }
-
-                    _ => {
-                        return Err(DataFusionError::Execution(
-                            "Unsupported Scalar Value Type".to_string(),
-                        ))
-                    }
-                }
+                let config_options = &mut state.config.options;
+                config_options.set(&variable, &value)?;
                 drop(state);
 
                 self.return_empty_dataframe()
@@ -1153,7 +1103,7 @@ pub struct SessionConfig {
     /// due to `resolve_table_ref` which passes back references)
     default_schema: String,
     /// Configuration options
-    config_options: ConfigOptions,
+    options: ConfigOptions,
     /// Opaque extensions.
     extensions: AnyMap,
 }
@@ -1163,7 +1113,7 @@ impl Default for SessionConfig {
         Self {
             default_catalog: DEFAULT_CATALOG.to_owned(),
             default_schema: DEFAULT_SCHEMA.to_owned(),
-            config_options: ConfigOptions::new(),
+            options: ConfigOptions::new(),
             // Assume no extensions by default.
             extensions: HashMap::with_capacity_and_hasher(
                 0,
@@ -1180,16 +1130,13 @@ impl SessionConfig {
     }
 
     /// Create an execution config with config options read from the environment
-    pub fn from_env() -> Self {
-        Self {
-            config_options: ConfigOptions::from_env(),
-            ..Default::default()
-        }
+    pub fn from_env() -> Result<Self> {
+        Ok(ConfigOptions::from_env()?.into())
     }
 
     /// Set a configuration option
     pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
-        self.config_options.set(key, value);
+        self.options.set(key, &value.to_string()).unwrap();
         self
     }
 
@@ -1215,66 +1162,54 @@ impl SessionConfig {
     }
 
     /// Customize batch size
-    pub fn with_batch_size(self, n: usize) -> Self {
+    pub fn with_batch_size(mut self, n: usize) -> Self {
         // batch size must be greater than zero
         assert!(n > 0);
-        self.set_u64(OPT_BATCH_SIZE, n.try_into().unwrap())
+        self.options.execution.batch_size = n;
+        self
     }
 
     /// Customize [`OPT_TARGET_PARTITIONS`]
-    pub fn with_target_partitions(self, n: usize) -> Self {
+    pub fn with_target_partitions(mut self, n: usize) -> Self {
         // partition count must be greater than zero
         assert!(n > 0);
-        self.set_usize(OPT_TARGET_PARTITIONS, n)
+        self.options.execution.target_partitions = n;
+        self
     }
 
     /// get target_partitions
     pub fn target_partitions(&self) -> usize {
-        self.config_options
-            .get_usize(OPT_TARGET_PARTITIONS)
-            .expect("target partitions must be set")
+        self.options.execution.target_partitions
     }
 
     /// Is the information schema enabled?
     pub fn information_schema(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_INFORMATION_SCHEMA)
-            .unwrap_or_default()
+        self.options.catalog.information_schema
     }
 
     /// Should the context create the default catalog and schema?
     pub fn create_default_catalog_and_schema(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA)
-            .unwrap_or_default()
+        self.options.catalog.create_default_catalog_and_schema
     }
 
     /// Are joins repartitioned during execution?
     pub fn repartition_joins(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_REPARTITION_JOINS)
-            .unwrap_or_default()
+        self.options.optimizer.repartition_joins
     }
 
     /// Are aggregates repartitioned during execution?
     pub fn repartition_aggregations(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_REPARTITION_AGGREGATIONS)
-            .unwrap_or_default()
+        self.options.optimizer.repartition_aggregations
     }
 
     /// Are window functions repartitioned during execution?
     pub fn repartition_window_functions(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_REPARTITION_WINDOWS)
-            .unwrap_or_default()
+        self.options.optimizer.repartition_windows
     }
 
     /// Are statistics collected during execution?
     pub fn collect_statistics(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_COLLECT_STATISTICS)
-            .unwrap_or_default()
+        self.options.execution.collect_statistics
     }
 
     /// Selects a name for the default catalog and schema
@@ -1290,109 +1225,85 @@ impl SessionConfig {
 
     /// Controls whether the default catalog and schema will be automatically created
     pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA, create);
+        self.options.catalog.create_default_catalog_and_schema = create;
         self
     }
 
     /// Enables or disables the inclusion of `information_schema` virtual tables
     pub fn with_information_schema(mut self, enabled: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_INFORMATION_SCHEMA, enabled);
+        self.options.catalog.information_schema = enabled;
         self
     }
 
     /// Enables or disables the use of repartitioning for joins to improve parallelism
     pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
-        self.config_options.set_bool(OPT_REPARTITION_JOINS, enabled);
+        self.options.optimizer.repartition_joins = enabled;
         self
     }
 
     /// Enables or disables the use of repartitioning for aggregations to improve parallelism
     pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_REPARTITION_AGGREGATIONS, enabled);
+        self.options.optimizer.repartition_aggregations = enabled;
         self
     }
 
     /// Enables or disables the use of repartitioning for window functions to improve parallelism
     pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_REPARTITION_WINDOWS, enabled);
+        self.options.optimizer.repartition_windows = enabled;
         self
     }
 
     /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
     pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_PARQUET_ENABLE_PRUNING, enabled);
+        self.options.execution.parquet.pruning = enabled;
         self
     }
 
     /// Returns true if pruning predicate should be used to skip parquet row groups
     pub fn parquet_pruning(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_PARQUET_ENABLE_PRUNING)
-            .unwrap_or(false)
+        self.options.execution.parquet.pruning
     }
 
     /// Enables or disables the collection of statistics after listing files
     pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_COLLECT_STATISTICS, enabled);
+        self.options.execution.collect_statistics = enabled;
         self
     }
 
     /// Get the currently configured batch size
     pub fn batch_size(&self) -> usize {
-        self.config_options
-            .get_u64(OPT_BATCH_SIZE)
-            .unwrap_or_default()
-            .try_into()
-            .unwrap()
+        self.options.execution.batch_size
     }
 
-    /// Enables or disables the coalescence of small batches into larger batches
-    pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
-        self.config_options.set_bool(OPT_COALESCE_BATCHES, enabled);
-        self
-    }
-
-    /// Returns true if record batches will be examined between each operator
-    /// and small batches will be coalesced into larger batches.
-    pub fn coalesce_batches(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_COALESCE_BATCHES)
-            .unwrap_or_default()
-    }
-
-    /// Enables or disables the round robin repartition for increasing parallelism
-    pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
-        self.config_options
-            .set_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION, enabled);
-        self
-    }
+    /// Convert configuration options to name-value pairs with values
+    /// converted to strings.
+    ///
+    /// Note that this method will eventually be deprecated and
+    /// replaced by [`config_options`].
+    ///
+    /// [`config_options`]: SessionContext::config_option
+    pub fn to_props(&self) -> HashMap<String, String> {
+        let mut map = HashMap::new();
+        // copy configs from config_options
+        for entry in self.options.entries() {
+            map.insert(entry.key, entry.value.unwrap_or_default());
+        }
 
-    /// Returns true if the physical plan optimizer will try to
-    /// add round robin repartition to increase parallelism to leverage more CPU cores.
-    pub fn round_robin_repartition(&self) -> bool {
-        self.config_options
-            .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
-            .unwrap_or_default()
+        map
     }
 
     /// Return a handle to the configuration options.
     ///
     /// [`config_options`]: SessionContext::config_option
     pub fn config_options(&self) -> &ConfigOptions {
-        &self.config_options
+        &self.options
     }
 
     /// Return a mutable handle to the configuration options.
     ///
     /// [`config_options`]: SessionContext::config_option
     pub fn config_options_mut(&mut self) -> &mut ConfigOptions {
-        &mut self.config_options
+        &mut self.options
     }
 
     /// Add extensions.
@@ -1464,6 +1375,15 @@ impl SessionConfig {
     }
 }
 
+impl From<ConfigOptions> for SessionConfig {
+    fn from(options: ConfigOptions) -> Self {
+        Self {
+            options,
+            ..Default::default()
+        }
+    }
+}
+
 /// Execution context for registering data sources and executing queries
 #[derive(Clone)]
 pub struct SessionState {
@@ -1530,6 +1450,7 @@ impl SessionState {
         // We need to take care of the rule ordering. They may influence each other.
         let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
             vec![Arc::new(AggregateStatistics::new())];
+
         // - In order to increase the parallelism, it will change the output partitioning
         // of some operators in the plan tree, which will influence other rules.
         // Therefore, it should be run as soon as possible.
@@ -1538,7 +1459,7 @@ impl SessionState {
         //      - it's conflicted with some parts of the BasicEnforcement, since it will
         //      introduce additional repartitioning while the BasicEnforcement aims at
         //      reducing unnecessary repartitioning.
-        if config.round_robin_repartition() {
+        if config.options.optimizer.enable_round_robin_repartition {
             physical_optimizers.push(Arc::new(Repartition::new()));
         }
         //- Currently it will depend on the partition number to decide whether to change the
@@ -1570,8 +1491,10 @@ impl SessionState {
         physical_optimizers.push(Arc::new(OptimizeSorts::new()));
         // It will not influence the distribution and ordering of the whole plan tree.
         // Therefore, to avoid influencing other rules, it should be run at last.
-        if config.coalesce_batches() {
-            physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size())));
+        if config.options.execution.coalesce_batches {
+            physical_optimizers.push(Arc::new(CoalesceBatches::new(
+                config.options.execution.batch_size,
+            )));
         }
         // The PipelineChecker rule will reject non-runnable query plans that use
         // pipeline-breaking operators on infinite input(s). The rule generates a
@@ -1597,23 +1520,16 @@ impl SessionState {
         runtime: &Arc<RuntimeEnv>,
         default_catalog: &MemoryCatalogProvider,
     ) {
-        let url = config.config_options.get("datafusion.catalog.location");
-        let format = config.config_options.get("datafusion.catalog.type");
+        let url = config.options.catalog.location.as_ref();
+        let format = config.options.catalog.format.as_ref();
         let (url, format) = match (url, format) {
             (Some(url), Some(format)) => (url, format),
             _ => return,
         };
-        if url.is_null() || format.is_null() {
-            return;
-        }
         let url = url.to_string();
         let format = format.to_string();
 
-        let has_header = config.config_options.get("datafusion.catalog.has_header");
-        let has_header: bool = has_header
-            .map(|x| FromStr::from_str(&x.to_string()).unwrap_or_default())
-            .unwrap_or_default();
-
+        let has_header = config.options.catalog.has_header;
         let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
         let authority = match url.host_str() {
             Some(host) => format!("{}://{}", url.scheme(), host),
@@ -1730,26 +1646,14 @@ impl SessionState {
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         // TODO: Implement OptimizerContext directly on DataFrame (#4631) (#4626)
         let config = {
-            let config_options = self.config_options();
+            let config = &self.config_options().optimizer;
             OptimizerContext::new()
-                .with_skip_failing_rules(
-                    config_options
-                        .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
-                        .unwrap_or_default(),
-                )
-                .with_max_passes(
-                    config_options
-                        .get_u64(OPT_OPTIMIZER_MAX_PASSES)
-                        .unwrap_or_default() as u8,
-                )
+                .with_skip_failing_rules(config.skip_failed_rules)
+                .with_max_passes(config.max_passes as u8)
                 .with_query_execution_start_time(
                     self.execution_props.query_execution_start_time,
                 )
-                .filter_null_keys(
-                    config_options
-                        .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
-                        .unwrap_or_default(),
-                )
+                .filter_null_keys(config.filter_null_join_keys)
         };
 
         if let LogicalPlan::Explain(e) = plan {
@@ -1867,7 +1771,17 @@ impl ContextProvider for SessionState {
     }
 
     fn get_config_option(&self, variable: &str) -> Option<ScalarValue> {
-        self.config_options().get(variable)
+        // TOOD: Move ConfigOptions into common crate
+        match variable {
+            "datafusion.execution.time_zone" => self
+                .config
+                .options
+                .execution
+                .time_zone
+                .as_ref()
+                .map(|s| ScalarValue::Utf8(Some(s.clone()))),
+            _ => unimplemented!(),
+        }
     }
 }
 
@@ -1923,59 +1837,15 @@ impl TaskContext {
         aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
         runtime: Arc<RuntimeEnv>,
     ) -> Self {
-        let session_config = if task_props.is_empty() {
-            SessionConfig::new()
-        } else {
-            SessionConfig::new()
-                .with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap())
-                .with_target_partitions(
-                    task_props
-                        .get(OPT_TARGET_PARTITIONS)
-                        .unwrap()
-                        .parse()
-                        .unwrap(),
-                )
-                .with_repartition_joins(
-                    task_props
-                        .get(OPT_REPARTITION_JOINS)
-                        .unwrap()
-                        .parse()
-                        .unwrap(),
-                )
-                .with_repartition_aggregations(
-                    task_props
-                        .get(OPT_REPARTITION_AGGREGATIONS)
-                        .unwrap()
-                        .parse()
-                        .unwrap(),
-                )
-                .with_repartition_windows(
-                    task_props
-                        .get(OPT_REPARTITION_WINDOWS)
-                        .unwrap()
-                        .parse()
-                        .unwrap(),
-                )
-                .with_parquet_pruning(
-                    task_props
-                        .get(OPT_PARQUET_ENABLE_PRUNING)
-                        .unwrap()
-                        .parse()
-                        .unwrap(),
-                )
-                .with_collect_statistics(
-                    task_props
-                        .get(OPT_COLLECT_STATISTICS)
-                        .unwrap()
-                        .parse()
-                        .unwrap(),
-                )
-        };
+        let mut config = ConfigOptions::new();
+        for (k, v) in task_props {
+            let _ = config.set(&k, &v);
+        }
 
         Self {
             task_id: Some(task_id),
             session_id,
-            session_config,
+            session_config: config.into(),
             scalar_functions,
             aggregate_functions,
             runtime,
@@ -2329,7 +2199,7 @@ mod tests {
         let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
         let cfg = SessionConfig::new()
             .set_str("datafusion.catalog.location", url.as_str())
-            .set_str("datafusion.catalog.type", "CSV")
+            .set_str("datafusion.catalog.format", "CSV")
             .set_str("datafusion.catalog.has_header", "true");
         let session_state = SessionState::with_config_rt(cfg, runtime);
         let ctx = SessionContext::with_state(session_state);
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 7c55a81aa..0b8206a78 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -18,9 +18,7 @@
 //! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
 //! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
 //!
-use crate::config::{
-    ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING,
-};
+use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
 use crate::physical_optimizer::PhysicalOptimizerRule;
@@ -72,10 +70,8 @@ impl PhysicalOptimizerRule for BasicEnforcement {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
-        let top_down_join_key_reordering = config
-            .get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
-            .unwrap_or_default();
+        let target_partitions = config.execution.target_partitions;
+        let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
         let new_plan = if top_down_join_key_reordering {
             // Run a top-down process to adjust input key ordering recursively
             let plan_requirements = PlanWithKeyRequirements::new(plan);
@@ -1137,7 +1133,7 @@ mod tests {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
             let mut config = ConfigOptions::new();
-            config.set_usize(OPT_TARGET_PARTITIONS, 10);
+            config.execution.target_partitions = 10;
 
             // run optimizer
             let optimizer = BasicEnforcement {};
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index b378e3beb..2589fa625 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 
-use crate::config::{ConfigOptions, OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD};
+use crate::config::ConfigOptions;
 use crate::logical_expr::JoinType;
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::joins::{
@@ -214,11 +214,8 @@ impl PhysicalOptimizerRule for JoinSelection {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let collect_left_threshold: usize = config
-            .get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
-            .unwrap_or_default()
-            .try_into()
-            .unwrap();
+        let config = &config.optimizer;
+        let collect_left_threshold = config.hash_join_single_partition_threshold;
         plan.transform_up(&|plan| {
             if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
                 match hash_join.partition_mode() {
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 5bdbf59a6..9320de41e 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -19,7 +19,7 @@
 use std::sync::Arc;
 
 use super::optimizer::PhysicalOptimizerRule;
-use crate::config::{ConfigOptions, OPT_TARGET_PARTITIONS};
+use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_plan::Partitioning::*;
 use crate::physical_plan::{
@@ -210,7 +210,7 @@ impl PhysicalOptimizerRule for Repartition {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
+        let target_partitions = config.execution.target_partitions;
         // Don't run optimizer if target_partitions == 1
         if target_partitions == 1 {
             Ok(plan)
@@ -364,7 +364,7 @@ mod tests {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
             let mut config = ConfigOptions::new();
-            config.set_usize(OPT_TARGET_PARTITIONS, 10);
+            config.execution.target_partitions = 10;
 
             // run optimizer
             let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index b229f3aa2..b50ac08ba 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -308,18 +308,21 @@ pub fn concat_batches(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::config::OPT_COALESCE_BATCHES;
+    use crate::config::ConfigOptions;
     use crate::datasource::MemTable;
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::projection::ProjectionExec;
     use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
-    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::prelude::SessionContext;
     use crate::test::create_vec_batches;
     use arrow::datatypes::{DataType, Field, Schema};
 
     #[tokio::test]
     async fn test_custom_batch_size() -> Result<()> {
-        let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234));
+        let mut config = ConfigOptions::new();
+        config.execution.batch_size = 1234;
+
+        let ctx = SessionContext::with_config(config.into());
         let plan = create_physical_plan(ctx).await?;
         let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
         let coalesce = projection
@@ -333,9 +336,10 @@ mod tests {
 
     #[tokio::test]
     async fn test_disable_coalesce() -> Result<()> {
-        let ctx = SessionContext::with_config(
-            SessionConfig::new().set_bool(OPT_COALESCE_BATCHES, false),
-        );
+        let mut config = ConfigOptions::new();
+        config.execution.coalesce_batches = false;
+
+        let ctx = SessionContext::with_config(config.into());
         let plan = create_physical_plan(ctx).await?;
         let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
         // projection should directly wrap filter with no coalesce step
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index c3e74ad22..d14c9235b 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -25,9 +25,7 @@ use std::fs;
 use std::ops::Range;
 use std::sync::Arc;
 
-use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS;
-use crate::config::OPT_PARQUET_REORDER_FILTERS;
-use crate::config::{ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX};
+use crate::config::ConfigOptions;
 use crate::datasource::file_format::parquet::fetch_parquet_metadata;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
@@ -202,9 +200,7 @@ impl ParquetExec {
     /// Return the value described in [`Self::with_pushdown_filters`]
     fn pushdown_filters(&self, config_options: &ConfigOptions) -> bool {
         self.pushdown_filters
-            .or_else(|| config_options.get_bool(OPT_PARQUET_PUSHDOWN_FILTERS))
-            // default to false
-            .unwrap_or_default()
+            .unwrap_or(config_options.execution.parquet.pushdown_filters)
     }
 
     /// If true, the `RowFilter` made by `pushdown_filters` may try to
@@ -219,9 +215,7 @@ impl ParquetExec {
     /// Return the value described in [`Self::with_reorder_filters`]
     fn reorder_filters(&self, config_options: &ConfigOptions) -> bool {
         self.reorder_filters
-            .or_else(|| config_options.get_bool(OPT_PARQUET_REORDER_FILTERS))
-            // default to false
-            .unwrap_or_default()
+            .unwrap_or(config_options.execution.parquet.reorder_filters)
     }
 
     /// If enabled, the reader will read the page index
@@ -236,9 +230,7 @@ impl ParquetExec {
     /// Return the value described in [`Self::with_enable_page_index`]
     fn enable_page_index(&self, config_options: &ConfigOptions) -> bool {
         self.enable_page_index
-            .or_else(|| config_options.get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX))
-            // default to false
-            .unwrap_or_default()
+            .unwrap_or(config_options.execution.parquet.enable_page_index)
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index cad2167c6..978ed195b 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -22,9 +22,6 @@ use super::{
     aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
     values::ValuesExec, windows,
 };
-use crate::config::{
-    OPT_EXPLAIN_LOGICAL_PLAN_ONLY, OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, OPT_PREFER_HASH_JOIN,
-};
 use crate::datasource::source_as_provider;
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_expr::utils::generate_sort_key;
@@ -1002,9 +999,7 @@ impl DefaultPhysicalPlanner {
                         _ => None
                     };
 
-                    let prefer_hash_join = session_state.config().config_options()
-                        .get_bool(OPT_PREFER_HASH_JOIN)
-                        .unwrap_or_default();
+                    let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join;
                     if join_on.is_empty() {
                         // there is no equal join condition, use the nested loop join
                         // TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
@@ -1720,21 +1715,14 @@ impl DefaultPhysicalPlanner {
             use PlanType::*;
             let mut stringified_plans = vec![];
 
-            if !session_state
-                .config_options()
-                .get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
-                .unwrap_or_default()
-            {
-                stringified_plans = e.stringified_plans.clone();
+            let config = &session_state.config_options().explain;
 
+            if !config.physical_plan_only {
+                stringified_plans = e.stringified_plans.clone();
                 stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
             }
 
-            if !session_state
-                .config_options()
-                .get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
-                .unwrap_or_default()
-            {
+            if !config.logical_plan_only {
                 let input = self
                     .create_initial_plan(e.plan.as_ref(), session_state)
                     .await?;
diff --git a/datafusion/core/tests/config_from_env.rs b/datafusion/core/tests/config_from_env.rs
index 3f1fcbd60..a420f5c9f 100644
--- a/datafusion/core/tests/config_from_env.rs
+++ b/datafusion/core/tests/config_from_env.rs
@@ -19,30 +19,27 @@ use datafusion::config::ConfigOptions;
 use std::env;
 
 #[test]
-fn get_config_bool_from_env() {
-    let config_key = "datafusion.optimizer.filter_null_join_keys";
+fn from_env() {
+    // Note: these must be a single test to avoid interference from concurrent execution
     let env_key = "DATAFUSION_OPTIMIZER_FILTER_NULL_JOIN_KEYS";
     env::set_var(env_key, "true");
-    let config = ConfigOptions::from_env();
+    let config = ConfigOptions::from_env().unwrap();
     env::remove_var(env_key);
-    assert!(config.get_bool(config_key).unwrap_or_default());
-}
+    assert!(config.optimizer.filter_null_join_keys);
 
-#[test]
-fn get_config_int_from_env() {
-    let config_key = "datafusion.execution.batch_size";
     let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";
 
     // for valid testing
     env::set_var(env_key, "4096");
-    let config = ConfigOptions::from_env();
-    assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096);
+    let config = ConfigOptions::from_env().unwrap();
+    assert_eq!(config.execution.batch_size, 4096);
 
     // for invalid testing
     env::set_var(env_key, "abc");
-    let config = ConfigOptions::from_env();
-    assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value
+    let err = ConfigOptions::from_env().unwrap_err().to_string();
+    assert_eq!(err, "Error parsing abc as usize\ncaused by\nExternal error: invalid digit found in string");
 
-    // To avoid influence other testing, we need to clear this environment variable
     env::remove_var(env_key);
+    let config = ConfigOptions::from_env().unwrap();
+    assert_eq!(config.execution.batch_size, 8192); // set to its default value
 }
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index ad5c0d09d..7b524454d 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -85,12 +85,7 @@ mod unix_test {
         fifo_path: &Path,
         with_unbounded_execution: bool,
     ) -> Result<SessionContext> {
-        let config = SessionConfig::new()
-            .with_batch_size(TEST_BATCH_SIZE)
-            .set_u64(
-                "datafusion.execution.coalesce_target_batch_size",
-                TEST_BATCH_SIZE as u64,
-            );
+        let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE);
         let ctx = SessionContext::with_config(config);
         // Register left table
         let left_schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 93b276210..123095186 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -28,7 +28,6 @@ use arrow::{
     util::pretty::pretty_format_batches,
 };
 use chrono::{Datelike, Duration};
-use datafusion::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
 use datafusion::{
     datasource::{provider_as_source, TableProvider},
     physical_plan::{
@@ -140,9 +139,8 @@ impl ContextWithParquet {
         let file = match unit {
             Unit::RowGroup => make_test_file_rg(scenario).await,
             Unit::Page => {
-                config
-                    .config_options_mut()
-                    .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, true);
+                let config = config.config_options_mut();
+                config.execution.parquet.enable_page_index = true;
                 make_test_file_page(scenario).await
             }
         };
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 82a24a04b..2d00b26df 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -16,10 +16,8 @@
 // under the License.
 
 use super::*;
-use datafusion::{
-    config::{OPT_EXPLAIN_LOGICAL_PLAN_ONLY, OPT_EXPLAIN_PHYSICAL_PLAN_ONLY},
-    physical_plan::display::DisplayableExecutionPlan,
-};
+use datafusion::config::ConfigOptions;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
 
 #[tokio::test]
 async fn explain_analyze_baseline_metrics() {
@@ -856,8 +854,9 @@ async fn csv_explain_analyze_verbose() {
 
 #[tokio::test]
 async fn explain_logical_plan_only() {
-    let config = SessionConfig::new().set_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY, true);
-    let ctx = SessionContext::with_config(config);
+    let mut config = ConfigOptions::new();
+    config.explain.logical_plan_only = true;
+    let ctx = SessionContext::with_config(config.into());
     let sql = "EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 150)) as t (c1,c2,c3)";
     let actual = execute(&ctx, sql).await;
     let actual = normalize_vec_for_explain(actual);
@@ -875,8 +874,9 @@ async fn explain_logical_plan_only() {
 
 #[tokio::test]
 async fn explain_physical_plan_only() {
-    let config = SessionConfig::new().set_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, true);
-    let ctx = SessionContext::with_config(config);
+    let mut config = ConfigOptions::new();
+    config.explain.physical_plan_only = true;
+    let ctx = SessionContext::with_config(config.into());
     let sql = "EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 150)) as t (c1,c2,c3)";
     let actual = execute(&ctx, sql).await;
     let actual = normalize_vec_for_explain(actual);
@@ -894,9 +894,9 @@ async fn explain_physical_plan_only() {
 #[tokio::test]
 async fn explain_nested() {
     async fn test_nested_explain(explain_phy_plan_flag: bool) {
-        let config = SessionConfig::new()
-            .set_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, explain_phy_plan_flag);
-        let ctx = SessionContext::with_config(config);
+        let mut config = ConfigOptions::new();
+        config.explain.physical_plan_only = explain_phy_plan_flag;
+        let ctx = SessionContext::with_config(config.into());
         let sql = "EXPLAIN explain select 1";
         let err = ctx.sql(sql).await.unwrap_err();
         assert!(err.to_string().contains("Explain must be root of the plan"));
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index eaf4603dd..445950e1d 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -25,7 +25,7 @@ use arrow::{
 use chrono::prelude::*;
 use chrono::Duration;
 
-use datafusion::config::OPT_PREFER_HASH_JOIN;
+use datafusion::config::ConfigOptions;
 use datafusion::datasource::TableProvider;
 use datafusion::from_slice::FromSlice;
 use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection, TableScan};
@@ -570,9 +570,10 @@ fn create_sort_merge_join_context(
     column_left: &str,
     column_right: &str,
 ) -> Result<SessionContext> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new().set_bool(OPT_PREFER_HASH_JOIN, false),
-    );
+    let mut config = ConfigOptions::new();
+    config.optimizer.prefer_hash_join = false;
+
+    let ctx = SessionContext::with_config(config.into());
 
     let t1_schema = Arc::new(Schema::new(vec![
         Field::new(column_left, DataType::UInt32, true),
@@ -618,12 +619,12 @@ fn create_sort_merge_join_context(
 }
 
 fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .set_bool(OPT_PREFER_HASH_JOIN, false)
-            .with_target_partitions(2)
-            .with_batch_size(4096),
-    );
+    let mut config = ConfigOptions::new();
+    config.optimizer.prefer_hash_join = false;
+    config.execution.target_partitions = 2;
+    config.execution.batch_size = 4096;
+
+    let ctx = SessionContext::with_config(config.into());
 
     let t1_schema = Schema::new(vec![
         Field::new("c1", DataType::Date32, true),
diff --git a/datafusion/core/tests/sql/set_variable.rs b/datafusion/core/tests/sql/set_variable.rs
index 085e7b1f4..6a85f7df1 100644
--- a/datafusion/core/tests/sql/set_variable.rs
+++ b/datafusion/core/tests/sql/set_variable.rs
@@ -110,7 +110,7 @@ async fn set_variable_unknown_variable() {
         .unwrap_err();
     assert_eq!(
         err.to_string(),
-        "Execution error: Can not SET variable: Unknown Variable aabbcc"
+        "External error: could not find config namespace for key \"aabbcc\""
     );
 }
 
@@ -161,7 +161,7 @@ async fn set_bool_variable_bad_value() {
 
     assert_eq!(
         err.to_string(),
-        "Execution error: Failed to parse 1 as bool"
+        "Error parsing 1 as bool\ncaused by\nExternal error: provided string was not `true` or `false`"
     );
 
     let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to abc")
@@ -170,7 +170,7 @@ async fn set_bool_variable_bad_value() {
 
     assert_eq!(
         err.to_string(),
-        "Execution error: Failed to parse abc as bool"
+        "Error parsing abc as bool\ncaused by\nExternal error: provided string was not `true` or `false`"
     );
 }
 
@@ -236,7 +236,7 @@ async fn set_u64_variable_bad_value() {
 
     assert_eq!(
         err.to_string(),
-        "Execution error: Failed to parse -1 as u64"
+        "Error parsing -1 as usize\ncaused by\nExternal error: invalid digit found in string"
     );
 
     let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to abc")
@@ -245,7 +245,7 @@ async fn set_u64_variable_bad_value() {
 
     assert_eq!(
         err.to_string(),
-        "Execution error: Failed to parse abc as u64"
+        "Error parsing abc as usize\ncaused by\nExternal error: invalid digit found in string"
     );
 
     let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to 0.1")
@@ -254,7 +254,7 @@ async fn set_u64_variable_bad_value() {
 
     assert_eq!(
         err.to_string(),
-        "Execution error: Failed to parse 0.1 as u64"
+        "Error parsing 0.1 as usize\ncaused by\nExternal error: invalid digit found in string"
     );
 }
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 8eaa7dd43..2d663d33b 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -103,9 +103,10 @@ query R
 SHOW ALL
 ----
 datafusion.catalog.create_default_catalog_and_schema true
+datafusion.catalog.format NULL
+datafusion.catalog.has_header false
 datafusion.catalog.information_schema true
 datafusion.catalog.location NULL
-datafusion.catalog.type NULL
 datafusion.execution.batch_size 8192
 datafusion.execution.coalesce_batches true
 datafusion.execution.collect_statistics false
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 79245ca1a..cfc2ddc18 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -35,32 +35,33 @@ Values are parsed according to the [same rules used in casts from Utf8](https://
 If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted.
 Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions.
 
-| key                                                       | type    | default | description                                                                                                                                                                                                                                                                                                                                   |
-| --------------------------------------------------------- | ------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| datafusion.catalog.create_default_catalog_and_schema      | Boolean | true    | Whether the default catalog and schema should be created automatically.                                                                                                                                                                                                                                                                       |
-| datafusion.catalog.information_schema                     | Boolean | false   | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information                                                                                                                                                                                                                                     |
-| datafusion.catalog.location                               | Utf8    | NULL    | Location scanned to load tables for `default` schema, defaults to None                                                                                                                                                                                                                                                                        |
-| datafusion.catalog.type                                   | Utf8    | NULL    | Type of `TableProvider` to use when loading `default` schema. Defaults to None                                                                                                                                                                                                                                                                |
-| datafusion.execution.batch_size                           | UInt64  | 8192    | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption.                                                                                                                                                         |
-| datafusion.execution.coalesce_batches                     | Boolean | true    | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.batch_size'. |
-| datafusion.execution.collect_statistics                   | Boolean | false   | Should DataFusion collect statistics after listing files                                                                                                                                                                                                                                                                                      |
-| datafusion.execution.parquet.enable_page_index            | Boolean | false   | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                  |
-| datafusion.execution.parquet.metadata_size_hint           | UInt64  | NULL    | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer.                                                                       |
-| datafusion.execution.parquet.pruning                      | Boolean | true    | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file.                                                                                                                                                                              |
-| datafusion.execution.parquet.pushdown_filters             | Boolean | false   | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded.                                                                                                                                                                                                                        |
-| datafusion.execution.parquet.reorder_filters              | Boolean | false   | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query.                                                                                                                 |
-| datafusion.execution.parquet.skip_metadata                | Boolean | true    | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata.                                                                                            |
-| datafusion.execution.target_partitions                    | UInt64  | 0       | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system.                                                                                                                                                                                                  |
-| datafusion.execution.time_zone                            | Utf8    | +00:00  | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, then extract the hour.                                                                                                                                                                           |
-| datafusion.explain.logical_plan_only                      | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                        |
-| datafusion.explain.physical_plan_only                     | Boolean | false   | When set to true, the explain statement will only print physical plans.                                                                                                                                                                                                                                                                       |
-| datafusion.optimizer.enable_round_robin_repartition       | Boolean | true    | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores                                                                                                                                                                                                      |
-| datafusion.optimizer.filter_null_join_keys                | Boolean | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                               |
-| datafusion.optimizer.hash_join_single_partition_threshold | UInt64  | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                |
-| datafusion.optimizer.max_passes                           | UInt64  | 3       | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                                                          |
-| datafusion.optimizer.prefer_hash_join                     | Boolean | true    | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true                                                                                                                                                        |
-| datafusion.optimizer.repartition_aggregations             | Boolean | true    | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level                                                                                                                                                                                                    |
-| datafusion.optimizer.repartition_joins                    | Boolean | true    | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level                                                                                                                                                                                                              |
-| datafusion.optimizer.repartition_windows                  | Boolean | true    | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level                                                                                                                                                                                             |
-| datafusion.optimizer.skip_failed_rules                    | Boolean | true    | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail.                                                                                                         |
-| datafusion.optimizer.top_down_join_key_reordering         | Boolean | true    | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true                                                                                                                                                                                                                          |
+| key                                                       | default | description                                                                                                                                                                                                                                                                                                |
+| --------------------------------------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| datafusion.catalog.create_default_catalog_and_schema      | true    | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system.                                                                                                                                                               |
+| datafusion.catalog.information_schema                     | false   | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information                                                                                                                                                                                                  |
+| datafusion.catalog.location                               | NULL    | Location scanned to load tables for `default` schema                                                                                                                                                                                                                                                       |
+| datafusion.catalog.format                                 | NULL    | Type of `TableProvider` to use when loading `default` schema                                                                                                                                                                                                                                               |
+| datafusion.catalog.has_header                             | false   | If the file has a header                                                                                                                                                                                                                                                                                   |
+| datafusion.execution.batch_size                           | 8192    | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption                                                                                                                       |
+| datafusion.execution.coalesce_batches                     | true    | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting |
+| datafusion.execution.collect_statistics                   | false   | Should DataFusion collect statistics after listing files                                                                                                                                                                                                                                                   |
+| datafusion.execution.target_partitions                    | 0       | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system                                                                                                                                                                |
+| datafusion.execution.time_zone                            | +00:00  | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour                                                                                                                                              |
+| datafusion.execution.parquet.enable_page_index            | false   | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                               |
+| datafusion.execution.parquet.pruning                      | true    | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file                                                                                                                                            |
+| datafusion.execution.parquet.skip_metadata                | true    | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata                                                          |
+| datafusion.execution.parquet.metadata_size_hint           | NULL    | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer                                     |
+| datafusion.execution.parquet.pushdown_filters             | false   | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded                                                                                                                                                                                      |
+| datafusion.execution.parquet.reorder_filters              | false   | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query                                                                                |
+| datafusion.optimizer.enable_round_robin_repartition       | true    | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores                                                                                                                                                                   |
+| datafusion.optimizer.filter_null_join_keys                | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                            |
+| datafusion.optimizer.repartition_aggregations             | true    | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level"                                                                                                                                                                |
+| datafusion.optimizer.repartition_joins                    | true    | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level"                                                                                                                                                                          |
+| datafusion.optimizer.repartition_windows                  | true    | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level"                                                                                                                                                         |
+| datafusion.optimizer.skip_failed_rules                    | true    | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail                                                                       |
+| datafusion.optimizer.max_passes                           | 3       | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                       |
+| datafusion.optimizer.top_down_join_key_reordering         | true    | When set to true, the physical plan optimizer will run a top down process to reorder the join keys                                                                                                                                                                                                         |
+| datafusion.optimizer.prefer_hash_join                     | true    | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory                                                                                                                                      |
+| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                             |
+| datafusion.explain.logical_plan_only                      | false   | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                      |
+| datafusion.explain.physical_plan_only                     | false   | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                     |
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 54db76133..e1b3c5c18 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -23,10 +23,7 @@ use std::sync::Arc;
 
 use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion::common::ToDFSchema;
-use datafusion::config::{
-    OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
-    OPT_PARQUET_REORDER_FILTERS,
-};
+use datafusion::config::ConfigOptions;
 use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::error::Result;
@@ -61,10 +58,11 @@ pub struct ParquetScanOptions {
 impl ParquetScanOptions {
     /// Returns a [`SessionConfig`] with the given options
     pub fn config(&self) -> SessionConfig {
-        SessionConfig::new()
-            .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, self.pushdown_filters)
-            .set_bool(OPT_PARQUET_REORDER_FILTERS, self.reorder_filters)
-            .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, self.enable_page_index)
+        let mut config = ConfigOptions::new();
+        config.execution.parquet.pushdown_filters = self.pushdown_filters;
+        config.execution.parquet.reorder_filters = self.reorder_filters;
+        config.execution.parquet.enable_page_index = self.enable_page_index;
+        config.into()
     }
 }