You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/05/30 11:53:26 UTC
[arrow-ballista] 01/01: Add config to collect statistics
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch add_collect_statistics
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
commit 4d98d2682d98b4d280f314da00d4758fb3182b22
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Tue May 30 13:53:15 2023 +0200
Add config to collect statistics
---
ballista/client/src/prelude.rs | 2 +-
ballista/core/src/config.rs | 10 ++++++++++
.../scheduler/src/state/execution_graph/execution_stage.rs | 10 ++++++++--
ballista/scheduler/src/state/session_manager.rs | 2 ++
benchmarks/src/bin/tpch.rs | 3 ++-
python/src/context.rs | 2 ++
6 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 5b73728e..c2984789 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -22,7 +22,7 @@ pub use ballista_core::{
BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_PLUGIN_DIR,
BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS,
- BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA,
+ BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA, BALLISTA_COLLECT_STATISTICS
},
error::{BallistaError, Result},
};
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index c3981bef..653167ba 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -34,6 +34,8 @@ pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins";
pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations";
pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
+pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
+
pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
/// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
@@ -182,6 +184,10 @@ impl BallistaConfig {
ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
"Sets whether enable information_schema".to_string(),
DataType::Boolean, Some("false".to_string())),
+ ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(),
+ "Configuration for collecting statistics during scan".to_string(),
+ DataType::Boolean, Some("true".to_string())
+ ),
ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
"Sets the plugin dir".to_string(),
DataType::Utf8, Some("".to_string())),
@@ -224,6 +230,10 @@ impl BallistaConfig {
self.get_bool_setting(BALLISTA_PARQUET_PRUNING)
}
+ pub fn collect_statistics(&self) -> bool {
+ self.get_bool_setting(BALLISTA_COLLECT_STATISTICS)
+ }
+
pub fn default_with_information_schema(&self) -> bool {
self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
}
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index b187d8e9..628e8c6f 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,6 +22,7 @@ use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
+use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -362,9 +363,14 @@ impl UnresolvedStage {
&input_locations,
)?;
- // Optimize join order based on new resolved statistics
+ // Optimize plan based on new resolved statistics
let optimize_join = JoinSelection::new();
- let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
+ let optimize_aggregate = AggregateStatistics::new();
+
+ let cfg: SessionConfig = SessionConfig::new();
+
+ let plan: Arc<dyn ExecutionPlan> = optimize_join.optimize(plan, &cfg)?;
+ let plan: Arc<dyn ExecutionPlan> = optimize_aggregate.optimize(plan, &cfg)?;
Ok(ResolvedStage::new(
self.stage_id,
diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs
index eb7df9f2..56701d0b 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -121,6 +121,7 @@ pub fn create_datafusion_context(
.with_repartition_joins(ballista_config.repartition_joins())
.with_repartition_aggregations(ballista_config.repartition_aggregations())
.with_repartition_windows(ballista_config.repartition_windows())
+ .with_collect_statistics(ballista_config.collect_statistics())
.with_parquet_pruning(ballista_config.parquet_pruning());
let config = propagate_ballista_configs(config, ballista_config);
@@ -142,6 +143,7 @@ pub fn update_datafusion_context(
.with_repartition_joins(ballista_config.repartition_joins())
.with_repartition_aggregations(ballista_config.repartition_aggregations())
.with_repartition_windows(ballista_config.repartition_windows())
+ .with_collect_statistics(ballista_config.collect_statistics())
.with_parquet_pruning(ballista_config.parquet_pruning());
let config = propagate_ballista_configs(config, ballista_config);
mut_state.config = config;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index aabfea6f..624cf9e4 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -20,7 +20,7 @@
use ballista::context::BallistaContext;
use ballista::prelude::{
BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
- BALLISTA_JOB_NAME,
+ BALLISTA_JOB_NAME, BALLISTA_COLLECT_STATISTICS,
};
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
@@ -363,6 +363,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
&format!("Query derived from TPC-H q{}", opt.query),
)
.set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+ .set(BALLISTA_COLLECT_STATISTICS, "true")
.build()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
diff --git a/python/src/context.rs b/python/src/context.rs
index 26c5661a..8b56f690 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -72,6 +72,7 @@ impl PySessionContext {
repartition_windows: bool,
parquet_pruning: bool,
target_partitions: Option<usize>,
+ collect_statistics: bool,
// TODO: config_options
) -> Self {
let cfg = SessionConfig::new()
@@ -81,6 +82,7 @@ impl PySessionContext {
.with_repartition_joins(repartition_joins)
.with_repartition_aggregations(repartition_aggregations)
.with_repartition_windows(repartition_windows)
+ .with_collect_statistics(collect_statistics)
.with_parquet_pruning(parquet_pruning);
let cfg_full = match target_partitions {