You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/29 08:09:53 UTC
[arrow-datafusion] branch master updated: Make SessionState members private (#4764)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 41c72cf51 Make SessionState members private (#4764)
41c72cf51 is described below
commit 41c72cf515389e90c20433dbcc4116a59016a15b
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Dec 29 08:09:48 2022 +0000
Make SessionState members private (#4764)
* Make SessionState members private
* Fix avro
---
benchmarks/src/bin/tpch.rs | 2 +-
datafusion/core/src/datasource/listing/table.rs | 8 ++--
.../core/src/datasource/listing_table_factory.rs | 4 +-
datafusion/core/src/execution/context.rs | 45 +++++++++++++++++-----
.../core/src/physical_plan/file_format/avro.rs | 2 +-
.../core/src/physical_plan/file_format/json.rs | 2 +-
.../core/src/physical_plan/file_format/parquet.rs | 2 +-
datafusion/core/src/physical_plan/planner.rs | 45 ++++++++++------------
datafusion/core/tests/parquet/page_pruning.rs | 2 +-
9 files changed, 67 insertions(+), 45 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 8b83ed799..d74e9b64d 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -409,7 +409,7 @@ async fn get_table(
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
- .with_collect_stat(state.config.collect_statistics());
+ .with_collect_stat(state.config().collect_statistics());
let table_path = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(table_path).with_listing_options(options);
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 8458cc4ae..99d009b34 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -146,7 +146,7 @@ impl ListingTableConfig {
/// Infer `ListingOptions` based on `table_path` suffix.
pub async fn infer_options(self, state: &SessionState) -> Result<Self> {
let store = state
- .runtime_env
+ .runtime_env()
.object_store(self.table_paths.get(0).unwrap())?;
let file = self
@@ -163,7 +163,7 @@ impl ListingTableConfig {
let listing_options = ListingOptions::new(format)
.with_file_extension(file_extension)
- .with_target_partitions(state.config.target_partitions());
+ .with_target_partitions(state.config().target_partitions());
Ok(Self {
table_paths: self.table_paths,
@@ -388,7 +388,7 @@ impl ListingOptions {
state: &SessionState,
table_path: &'a ListingTableUrl,
) -> Result<SchemaRef> {
- let store = state.runtime_env.object_store(table_path)?;
+ let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
.list_all_files(store.as_ref(), &self.file_extension)
@@ -650,7 +650,7 @@ impl ListingTable {
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
let store = ctx
- .runtime_env
+ .runtime_env()
.object_store(self.table_paths.get(0).unwrap())?;
// list files (with partitions)
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index b48a1058a..fe4393cb2 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -117,9 +117,9 @@ impl TableProviderFactory for ListingTableFactory {
};
let options = ListingOptions::new(file_format)
- .with_collect_stat(state.config.collect_statistics())
+ .with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
- .with_target_partitions(state.config.target_partitions())
+ .with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(None);
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ba750909f..323e50ebd 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1505,25 +1505,25 @@ impl SessionConfig {
#[derive(Clone)]
pub struct SessionState {
/// Uuid for the session
- pub session_id: String,
+ session_id: String,
/// Responsible for optimizing a logical plan
- pub optimizer: Optimizer,
+ optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
- pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+ physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
- pub query_planner: Arc<dyn QueryPlanner + Send + Sync>,
+ query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Collection of catalogs containing schemas and ultimately TableProviders
- pub catalog_list: Arc<dyn CatalogList>,
+ catalog_list: Arc<dyn CatalogList>,
/// Scalar functions that are registered with the context
- pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
+ scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions registered in the context
- pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+ aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Session configuration
- pub config: SessionConfig,
+ config: SessionConfig,
/// Execution properties
- pub execution_props: ExecutionProps,
+ execution_props: ExecutionProps,
/// Runtime environment
- pub runtime_env: Arc<RuntimeEnv>,
+ runtime_env: Arc<RuntimeEnv>,
}
impl Debug for SessionState {
@@ -1840,6 +1840,31 @@ impl SessionState {
.await
}
+ /// Return the session ID
+ pub fn session_id(&self) -> &str {
+ &self.session_id
+ }
+
+ /// Return the runtime env
+ pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
+ &self.runtime_env
+ }
+
+ /// Return the execution properties
+ pub fn execution_props(&self) -> &ExecutionProps {
+ &self.execution_props
+ }
+
+ /// Return the [`SessionConfig`]
+ pub fn config(&self) -> &SessionConfig {
+ &self.config
+ }
+
+ /// Return the physical optimizers
+ pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
+ &self.physical_optimizers
+ }
+
/// return the configuration options
pub fn config_options(&self) -> &ConfigOptions {
self.config.config_options()
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 91d27c3e3..f67906e8a 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -243,7 +243,7 @@ mod tests {
let state = session_ctx.state();
state
- .runtime_env
+ .runtime_env()
.register_object_store("file", "", store.clone());
let testdata = crate::test_util::arrow_test_data();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 14fe0128c..3edb74025 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -275,7 +275,7 @@ mod tests {
file_compression_type: FileCompressionType,
) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
let store_url = ObjectStoreUrl::local_filesystem();
- let store = state.runtime_env.object_store(&store_url).unwrap();
+ let store = state.runtime_env().object_store(&store_url).unwrap();
let filename = "1.json";
let file_groups = partitioned_file_groups(
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 0ad41d994..f6e7d2ded 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -1403,7 +1403,7 @@ mod tests {
let task_ctx = session_ctx.task_ctx();
let object_store_url = ObjectStoreUrl::local_filesystem();
- let store = state.runtime_env.object_store(&object_store_url).unwrap();
+ let store = state.runtime_env().object_store(&object_store_url).unwrap();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 6aa3f627d..5b001f016 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -448,7 +448,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
expr,
input_dfschema,
input_schema,
- &session_state.execution_props,
+ session_state.execution_props(),
)
}
}
@@ -527,8 +527,8 @@ impl DefaultPhysicalPlanner {
let partition_keys = window_expr_common_partition_keys(window_expr)?;
let can_repartition = !partition_keys.is_empty()
- && session_state.config.target_partitions() > 1
- && session_state.config.repartition_window_functions();
+ && session_state.config().target_partitions() > 1
+ && session_state.config().repartition_window_functions();
let physical_partition_keys = if can_repartition
{
@@ -596,7 +596,7 @@ impl DefaultPhysicalPlanner {
descending: !*asc,
nulls_first: *nulls_first,
},
- &session_state.execution_props,
+ session_state.execution_props(),
),
_ => unreachable!(),
})
@@ -612,7 +612,7 @@ impl DefaultPhysicalPlanner {
e,
logical_input_schema,
&physical_input_schema,
- &session_state.execution_props,
+ session_state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
@@ -649,7 +649,7 @@ impl DefaultPhysicalPlanner {
e,
logical_input_schema,
&physical_input_schema,
- &session_state.execution_props,
+ session_state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
@@ -666,8 +666,8 @@ impl DefaultPhysicalPlanner {
let final_group: Vec<Arc<dyn PhysicalExpr>> = initial_aggr.output_group_expr();
let can_repartition = !groups.is_empty()
- && session_state.config.target_partitions() > 1
- && session_state.config.repartition_aggregations();
+ && session_state.config().target_partitions() > 1
+ && session_state.config().repartition_aggregations();
let (initial_aggr, next_partition_mode): (
Arc<dyn ExecutionPlan>,
@@ -833,7 +833,7 @@ impl DefaultPhysicalPlanner {
descending: !*asc,
nulls_first: *nulls_first,
},
- &session_state.execution_props,
+ session_state.execution_props(),
),
_ => Err(DataFusionError::Plan(
"Sort only accepts sort expressions".to_string(),
@@ -982,7 +982,7 @@ impl DefaultPhysicalPlanner {
expr,
&filter_df_schema,
&filter_schema,
- &session_state.execution_props,
+ session_state.execution_props(),
)?;
let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices);
@@ -995,7 +995,7 @@ impl DefaultPhysicalPlanner {
_ => None
};
- let prefer_hash_join = session_state.config.config_options()
+ let prefer_hash_join = session_state.config().config_options()
.get_bool(OPT_PREFER_HASH_JOIN)
.unwrap_or_default();
if join_on.is_empty() {
@@ -1007,8 +1007,8 @@ impl DefaultPhysicalPlanner {
join_filter,
join_type,
)?))
- } else if session_state.config.target_partitions() > 1
- && session_state.config.repartition_joins()
+ } else if session_state.config().target_partitions() > 1
+ && session_state.config().repartition_joins()
&& !prefer_hash_join
{
// Use SortMergeJoin if hash join is not preferred
@@ -1027,11 +1027,11 @@ impl DefaultPhysicalPlanner {
*null_equals_null,
)?))
}
- } else if session_state.config.target_partitions() > 1
- && session_state.config.repartition_joins()
+ } else if session_state.config().target_partitions() > 1
+ && session_state.config().repartition_joins()
&& prefer_hash_join {
let partition_mode = {
- if session_state.config.collect_statistics() {
+ if session_state.config().collect_statistics() {
PartitionMode::Auto
} else {
PartitionMode::Partitioned
@@ -1454,7 +1454,7 @@ fn get_null_physical_expr_pair(
expr,
input_dfschema,
input_schema,
- &session_state.execution_props,
+ session_state.execution_props(),
)?;
let physical_name = physical_name(&expr.clone())?;
@@ -1475,7 +1475,7 @@ fn get_physical_expr_pair(
expr,
input_dfschema,
input_schema,
- &session_state.execution_props,
+ session_state.execution_props(),
)?;
let physical_name = physical_name(expr)?;
Ok((physical_expr, physical_name))
@@ -1716,7 +1716,6 @@ impl DefaultPhysicalPlanner {
let mut stringified_plans = vec![];
if !session_state
- .config
.config_options()
.get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
.unwrap_or_default()
@@ -1727,7 +1726,6 @@ impl DefaultPhysicalPlanner {
}
if !session_state
- .config
.config_options()
.get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
.unwrap_or_default()
@@ -1773,7 +1771,7 @@ impl DefaultPhysicalPlanner {
where
F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule),
{
- let optimizers = &session_state.physical_optimizers;
+ let optimizers = session_state.physical_optimizers();
debug!(
"Input physical plan:\n{}\n",
displayable(plan.as_ref()).indent()
@@ -1844,15 +1842,14 @@ mod tests {
fn make_session_state() -> SessionState {
let runtime = Arc::new(RuntimeEnv::default());
- let config = SessionConfig::new();
+ let config = SessionConfig::new().with_target_partitions(4);
// TODO we should really test that no optimizer rules are failing here
// let config = config.set_bool(crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES, false);
SessionState::with_config_rt(config, runtime)
}
async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
- let mut session_state = make_session_state();
- session_state.config = session_state.config.with_target_partitions(4);
+ let session_state = make_session_state();
// optimize the logical plan
let logical_plan = session_state.optimize(logical_plan)?;
let planner = DefaultPhysicalPlanner::default();
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 73a6c6419..851c15e8c 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -33,7 +33,7 @@ use tokio_stream::StreamExt;
async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
let object_store_url = ObjectStoreUrl::local_filesystem();
- let store = state.runtime_env.object_store(&object_store_url).unwrap();
+ let store = state.runtime_env().object_store(&object_store_url).unwrap();
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);