You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/29 08:09:53 UTC

[arrow-datafusion] branch master updated: Make SessionState members private (#4764)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 41c72cf51 Make SessionState members private (#4764)
41c72cf51 is described below

commit 41c72cf515389e90c20433dbcc4116a59016a15b
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Dec 29 08:09:48 2022 +0000

    Make SessionState members private (#4764)
    
    * Make SessionState members private
    
    * Fix avro
---
 benchmarks/src/bin/tpch.rs                         |  2 +-
 datafusion/core/src/datasource/listing/table.rs    |  8 ++--
 .../core/src/datasource/listing_table_factory.rs   |  4 +-
 datafusion/core/src/execution/context.rs           | 45 +++++++++++++++++-----
 .../core/src/physical_plan/file_format/avro.rs     |  2 +-
 .../core/src/physical_plan/file_format/json.rs     |  2 +-
 .../core/src/physical_plan/file_format/parquet.rs  |  2 +-
 datafusion/core/src/physical_plan/planner.rs       | 45 ++++++++++------------
 datafusion/core/tests/parquet/page_pruning.rs      |  2 +-
 9 files changed, 67 insertions(+), 45 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 8b83ed799..d74e9b64d 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -409,7 +409,7 @@ async fn get_table(
     let options = ListingOptions::new(format)
         .with_file_extension(extension)
         .with_target_partitions(target_partitions)
-        .with_collect_stat(state.config.collect_statistics());
+        .with_collect_stat(state.config().collect_statistics());
 
     let table_path = ListingTableUrl::parse(path)?;
     let config = ListingTableConfig::new(table_path).with_listing_options(options);
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 8458cc4ae..99d009b34 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -146,7 +146,7 @@ impl ListingTableConfig {
     /// Infer `ListingOptions` based on `table_path` suffix.
     pub async fn infer_options(self, state: &SessionState) -> Result<Self> {
         let store = state
-            .runtime_env
+            .runtime_env()
             .object_store(self.table_paths.get(0).unwrap())?;
 
         let file = self
@@ -163,7 +163,7 @@ impl ListingTableConfig {
 
         let listing_options = ListingOptions::new(format)
             .with_file_extension(file_extension)
-            .with_target_partitions(state.config.target_partitions());
+            .with_target_partitions(state.config().target_partitions());
 
         Ok(Self {
             table_paths: self.table_paths,
@@ -388,7 +388,7 @@ impl ListingOptions {
         state: &SessionState,
         table_path: &'a ListingTableUrl,
     ) -> Result<SchemaRef> {
-        let store = state.runtime_env.object_store(table_path)?;
+        let store = state.runtime_env().object_store(table_path)?;
 
         let files: Vec<_> = table_path
             .list_all_files(store.as_ref(), &self.file_extension)
@@ -650,7 +650,7 @@ impl ListingTable {
         limit: Option<usize>,
     ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
         let store = ctx
-            .runtime_env
+            .runtime_env()
             .object_store(self.table_paths.get(0).unwrap())?;
         // list files (with partitions)
         let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index b48a1058a..fe4393cb2 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -117,9 +117,9 @@ impl TableProviderFactory for ListingTableFactory {
         };
 
         let options = ListingOptions::new(file_format)
-            .with_collect_stat(state.config.collect_statistics())
+            .with_collect_stat(state.config().collect_statistics())
             .with_file_extension(file_extension)
-            .with_target_partitions(state.config.target_partitions())
+            .with_target_partitions(state.config().target_partitions())
             .with_table_partition_cols(table_partition_cols)
             .with_file_sort_order(None);
 
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ba750909f..323e50ebd 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1505,25 +1505,25 @@ impl SessionConfig {
 #[derive(Clone)]
 pub struct SessionState {
     /// Uuid for the session
-    pub session_id: String,
+    session_id: String,
     /// Responsible for optimizing a logical plan
-    pub optimizer: Optimizer,
+    optimizer: Optimizer,
     /// Responsible for optimizing a physical execution plan
-    pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+    physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
     /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
-    pub query_planner: Arc<dyn QueryPlanner + Send + Sync>,
+    query_planner: Arc<dyn QueryPlanner + Send + Sync>,
     /// Collection of catalogs containing schemas and ultimately TableProviders
-    pub catalog_list: Arc<dyn CatalogList>,
+    catalog_list: Arc<dyn CatalogList>,
     /// Scalar functions that are registered with the context
-    pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
+    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
     /// Aggregate functions registered in the context
-    pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
     /// Session configuration
-    pub config: SessionConfig,
+    config: SessionConfig,
     /// Execution properties
-    pub execution_props: ExecutionProps,
+    execution_props: ExecutionProps,
     /// Runtime environment
-    pub runtime_env: Arc<RuntimeEnv>,
+    runtime_env: Arc<RuntimeEnv>,
 }
 
 impl Debug for SessionState {
@@ -1840,6 +1840,31 @@ impl SessionState {
             .await
     }
 
+    /// Return the session ID
+    pub fn session_id(&self) -> &str {
+        &self.session_id
+    }
+
+    /// Return the runtime env
+    pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
+        &self.runtime_env
+    }
+
+    /// Return the execution properties
+    pub fn execution_props(&self) -> &ExecutionProps {
+        &self.execution_props
+    }
+
+    /// Return the [`SessionConfig`]
+    pub fn config(&self) -> &SessionConfig {
+        &self.config
+    }
+
+    /// Return the physical optimizers
+    pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
+        &self.physical_optimizers
+    }
+
     /// return the configuration options
     pub fn config_options(&self) -> &ConfigOptions {
         self.config.config_options()
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 91d27c3e3..f67906e8a 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -243,7 +243,7 @@ mod tests {
         let state = session_ctx.state();
 
         state
-            .runtime_env
+            .runtime_env()
             .register_object_store("file", "", store.clone());
 
         let testdata = crate::test_util::arrow_test_data();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 14fe0128c..3edb74025 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -275,7 +275,7 @@ mod tests {
         file_compression_type: FileCompressionType,
     ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
         let store_url = ObjectStoreUrl::local_filesystem();
-        let store = state.runtime_env.object_store(&store_url).unwrap();
+        let store = state.runtime_env().object_store(&store_url).unwrap();
 
         let filename = "1.json";
         let file_groups = partitioned_file_groups(
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 0ad41d994..f6e7d2ded 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -1403,7 +1403,7 @@ mod tests {
         let task_ctx = session_ctx.task_ctx();
 
         let object_store_url = ObjectStoreUrl::local_filesystem();
-        let store = state.runtime_env.object_store(&object_store_url).unwrap();
+        let store = state.runtime_env().object_store(&object_store_url).unwrap();
 
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/alltypes_plain.parquet", testdata);
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 6aa3f627d..5b001f016 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -448,7 +448,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
             expr,
             input_dfschema,
             input_schema,
-            &session_state.execution_props,
+            session_state.execution_props(),
         )
     }
 }
@@ -527,8 +527,8 @@ impl DefaultPhysicalPlanner {
                     let partition_keys = window_expr_common_partition_keys(window_expr)?;
 
                     let can_repartition = !partition_keys.is_empty()
-                        && session_state.config.target_partitions() > 1
-                        && session_state.config.repartition_window_functions();
+                        && session_state.config().target_partitions() > 1
+                        && session_state.config().repartition_window_functions();
 
                     let physical_partition_keys = if can_repartition
                     {
@@ -596,7 +596,7 @@ impl DefaultPhysicalPlanner {
                                         descending: !*asc,
                                         nulls_first: *nulls_first,
                                     },
-                                    &session_state.execution_props,
+                                    session_state.execution_props(),
                                 ),
                                 _ => unreachable!(),
                             })
@@ -612,7 +612,7 @@ impl DefaultPhysicalPlanner {
                                 e,
                                 logical_input_schema,
                                 &physical_input_schema,
-                                &session_state.execution_props,
+                                session_state.execution_props(),
                             )
                         })
                         .collect::<Result<Vec<_>>>()?;
@@ -649,7 +649,7 @@ impl DefaultPhysicalPlanner {
                                 e,
                                 logical_input_schema,
                                 &physical_input_schema,
-                                &session_state.execution_props,
+                                session_state.execution_props(),
                             )
                         })
                         .collect::<Result<Vec<_>>>()?;
@@ -666,8 +666,8 @@ impl DefaultPhysicalPlanner {
                     let final_group: Vec<Arc<dyn PhysicalExpr>> = initial_aggr.output_group_expr();
 
                     let can_repartition = !groups.is_empty()
-                        && session_state.config.target_partitions() > 1
-                        && session_state.config.repartition_aggregations();
+                        && session_state.config().target_partitions() > 1
+                        && session_state.config().repartition_aggregations();
 
                     let (initial_aggr, next_partition_mode): (
                         Arc<dyn ExecutionPlan>,
@@ -833,7 +833,7 @@ impl DefaultPhysicalPlanner {
                                     descending: !*asc,
                                     nulls_first: *nulls_first,
                                 },
-                                &session_state.execution_props,
+                                session_state.execution_props(),
                             ),
                             _ => Err(DataFusionError::Plan(
                                 "Sort only accepts sort expressions".to_string(),
@@ -982,7 +982,7 @@ impl DefaultPhysicalPlanner {
                                 expr,
                                 &filter_df_schema,
                                 &filter_schema,
-                                &session_state.execution_props,
+                                session_state.execution_props(),
                             )?;
                             let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices);
 
@@ -995,7 +995,7 @@ impl DefaultPhysicalPlanner {
                         _ => None
                     };
 
-                    let prefer_hash_join = session_state.config.config_options()
+                    let prefer_hash_join = session_state.config().config_options()
                         .get_bool(OPT_PREFER_HASH_JOIN)
                         .unwrap_or_default();
                     if join_on.is_empty() {
@@ -1007,8 +1007,8 @@ impl DefaultPhysicalPlanner {
                             join_filter,
                             join_type,
                         )?))
-                    } else if session_state.config.target_partitions() > 1
-                        && session_state.config.repartition_joins()
+                    } else if session_state.config().target_partitions() > 1
+                        && session_state.config().repartition_joins()
                         && !prefer_hash_join
                     {
                         // Use SortMergeJoin if hash join is not preferred
@@ -1027,11 +1027,11 @@ impl DefaultPhysicalPlanner {
                                 *null_equals_null,
                             )?))
                         }
-                    } else if session_state.config.target_partitions() > 1
-                        && session_state.config.repartition_joins()
+                    } else if session_state.config().target_partitions() > 1
+                        && session_state.config().repartition_joins()
                         && prefer_hash_join {
                          let partition_mode = {
-                            if session_state.config.collect_statistics() {
+                            if session_state.config().collect_statistics() {
                                 PartitionMode::Auto
                             } else {
                                 PartitionMode::Partitioned
@@ -1454,7 +1454,7 @@ fn get_null_physical_expr_pair(
         expr,
         input_dfschema,
         input_schema,
-        &session_state.execution_props,
+        session_state.execution_props(),
     )?;
     let physical_name = physical_name(&expr.clone())?;
 
@@ -1475,7 +1475,7 @@ fn get_physical_expr_pair(
         expr,
         input_dfschema,
         input_schema,
-        &session_state.execution_props,
+        session_state.execution_props(),
     )?;
     let physical_name = physical_name(expr)?;
     Ok((physical_expr, physical_name))
@@ -1716,7 +1716,6 @@ impl DefaultPhysicalPlanner {
             let mut stringified_plans = vec![];
 
             if !session_state
-                .config
                 .config_options()
                 .get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
                 .unwrap_or_default()
@@ -1727,7 +1726,6 @@ impl DefaultPhysicalPlanner {
             }
 
             if !session_state
-                .config
                 .config_options()
                 .get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
                 .unwrap_or_default()
@@ -1773,7 +1771,7 @@ impl DefaultPhysicalPlanner {
     where
         F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule),
     {
-        let optimizers = &session_state.physical_optimizers;
+        let optimizers = session_state.physical_optimizers();
         debug!(
             "Input physical plan:\n{}\n",
             displayable(plan.as_ref()).indent()
@@ -1844,15 +1842,14 @@ mod tests {
 
     fn make_session_state() -> SessionState {
         let runtime = Arc::new(RuntimeEnv::default());
-        let config = SessionConfig::new();
+        let config = SessionConfig::new().with_target_partitions(4);
         // TODO we should really test that no optimizer rules are failing here
         // let config = config.set_bool(crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES, false);
         SessionState::with_config_rt(config, runtime)
     }
 
     async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut session_state = make_session_state();
-        session_state.config = session_state.config.with_target_partitions(4);
+        let session_state = make_session_state();
         // optimize the logical plan
         let logical_plan = session_state.optimize(logical_plan)?;
         let planner = DefaultPhysicalPlanner::default();
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 73a6c6419..851c15e8c 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -33,7 +33,7 @@ use tokio_stream::StreamExt;
 
 async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
     let object_store_url = ObjectStoreUrl::local_filesystem();
-    let store = state.runtime_env.object_store(&object_store_url).unwrap();
+    let store = state.runtime_env().object_store(&object_store_url).unwrap();
 
     let testdata = datafusion::test_util::parquet_test_data();
     let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);