You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 15:31:08 UTC
[arrow-datafusion] branch master updated: Remove TaskProperties / KV structure (#4382)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1438bc4ca Remove TaskProperties / KV structure (#4382)
1438bc4ca is described below
commit 1438bc4ca329e7887ab2dd1c2697ba4038255bdd
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 29 10:30:55 2022 -0500
Remove TaskProperties / KV structure (#4382)
---
datafusion/core/src/execution/context.rs | 110 +++++++++---------------
datafusion/core/src/physical_plan/sorts/sort.rs | 2 +-
2 files changed, 42 insertions(+), 70 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 736cf648f..2dd270459 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1810,22 +1810,14 @@ impl FunctionRegistry for SessionState {
}
}
-/// Task Context Properties
-pub enum TaskProperties {
- ///SessionConfig
- SessionConfig(SessionConfig),
- /// Name-value pairs of task properties
- KVPairs(HashMap<String, String>),
-}
-
/// Task Execution Context
pub struct TaskContext {
/// Session Id
session_id: String,
/// Optional Task Identify
task_id: Option<String>,
- /// Task properties
- properties: TaskProperties,
+ /// Session configuration
+ session_config: SessionConfig,
/// Scalar functions associated with this task context
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions associated with this task context
@@ -1844,10 +1836,43 @@ impl TaskContext {
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
runtime: Arc<RuntimeEnv>,
) -> Self {
+ let session_config = if task_props.is_empty() {
+ SessionConfig::new()
+ } else {
+ SessionConfig::new()
+ .with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap())
+ .with_target_partitions(
+ task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
+ )
+ .with_repartition_joins(
+ task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
+ )
+ .with_repartition_aggregations(
+ task_props
+ .get(REPARTITION_AGGREGATIONS)
+ .unwrap()
+ .parse()
+ .unwrap(),
+ )
+ .with_repartition_windows(
+ task_props
+ .get(REPARTITION_WINDOWS)
+ .unwrap()
+ .parse()
+ .unwrap(),
+ )
+ .with_parquet_pruning(
+ task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
+ )
+ .with_collect_statistics(
+ task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(),
+ )
+ };
+
Self {
task_id: Some(task_id),
session_id,
- properties: TaskProperties::KVPairs(task_props),
+ session_config,
scalar_functions,
aggregate_functions,
runtime,
@@ -1855,44 +1880,8 @@ impl TaskContext {
}
/// Return the SessionConfig associated with the Task
- pub fn session_config(&self) -> SessionConfig {
- let task_props = &self.properties;
- match task_props {
- TaskProperties::KVPairs(props) => {
- let session_config = SessionConfig::new();
- if props.is_empty() {
- session_config
- } else {
- session_config
- .with_batch_size(
- props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap(),
- )
- .with_target_partitions(
- props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
- )
- .with_repartition_joins(
- props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
- )
- .with_repartition_aggregations(
- props
- .get(REPARTITION_AGGREGATIONS)
- .unwrap()
- .parse()
- .unwrap(),
- )
- .with_repartition_windows(
- props.get(REPARTITION_WINDOWS).unwrap().parse().unwrap(),
- )
- .with_parquet_pruning(
- props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
- )
- .with_collect_statistics(
- props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(),
- )
- }
- }
- TaskProperties::SessionConfig(session_config) => session_config.clone(),
- }
+ pub fn session_config(&self) -> &SessionConfig {
+ &self.session_config
}
/// Return the session_id of this [TaskContext]
@@ -1914,24 +1903,7 @@ impl TaskContext {
/// Create a new task context instance from SessionContext
impl From<&SessionContext> for TaskContext {
fn from(session: &SessionContext) -> Self {
- let session_id = session.session_id.clone();
- let (config, scalar_functions, aggregate_functions) = {
- let session_state = session.state.read();
- (
- session_state.config.clone(),
- session_state.scalar_functions.clone(),
- session_state.aggregate_functions.clone(),
- )
- };
- let runtime = session.runtime_env();
- Self {
- task_id: None,
- session_id,
- properties: TaskProperties::SessionConfig(config),
- scalar_functions,
- aggregate_functions,
- runtime,
- }
+ TaskContext::from(&*session.state.read())
}
}
@@ -1939,14 +1911,14 @@ impl From<&SessionContext> for TaskContext {
impl From<&SessionState> for TaskContext {
fn from(state: &SessionState) -> Self {
let session_id = state.session_id.clone();
- let config = state.config.clone();
+ let session_config = state.config.clone();
let scalar_functions = state.scalar_functions.clone();
let aggregate_functions = state.aggregate_functions.clone();
let runtime = state.runtime_env.clone();
Self {
task_id: None,
session_id,
- properties: TaskProperties::SessionConfig(config),
+ session_config,
scalar_functions,
aggregate_functions,
runtime,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 6e87ba76c..bfc33a954 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -916,7 +916,7 @@ async fn do_sort(
schema.clone(),
expr,
metrics_set,
- Arc::new(context.session_config()),
+ Arc::new(context.session_config().clone()),
context.runtime_env(),
fetch,
);