You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/05/30 11:53:26 UTC

[arrow-ballista] 01/01: Add config to collect statistics

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch add_collect_statistics
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 4d98d2682d98b4d280f314da00d4758fb3182b22
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Tue May 30 13:53:15 2023 +0200

    Add config to collect statistics
---
 ballista/client/src/prelude.rs                                 |  2 +-
 ballista/core/src/config.rs                                    | 10 ++++++++++
 .../scheduler/src/state/execution_graph/execution_stage.rs     | 10 ++++++++--
 ballista/scheduler/src/state/session_manager.rs                |  2 ++
 benchmarks/src/bin/tpch.rs                                     |  3 ++-
 python/src/context.rs                                          |  2 ++
 6 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 5b73728e..c2984789 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -22,7 +22,7 @@ pub use ballista_core::{
         BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
         BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_PLUGIN_DIR,
         BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS,
-        BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA,
+        BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA, BALLISTA_COLLECT_STATISTICS
     },
     error::{BallistaError, Result},
 };
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index c3981bef..653167ba 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -34,6 +34,8 @@ pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins";
 pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations";
 pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
 pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
+pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
+
 pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
 /// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
 pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
@@ -182,6 +184,10 @@ impl BallistaConfig {
             ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
                              "Sets whether enable information_schema".to_string(),
                              DataType::Boolean, Some("false".to_string())),
+            ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(),
+                "Configuration for collecting statistics during scan".to_string(),
+                DataType::Boolean, Some("true".to_string())
+            ),
             ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
                              "Sets the plugin dir".to_string(),
                              DataType::Utf8, Some("".to_string())),
@@ -224,6 +230,10 @@ impl BallistaConfig {
         self.get_bool_setting(BALLISTA_PARQUET_PRUNING)
     }
 
+    pub fn collect_statistics(&self) -> bool {
+        self.get_bool_setting(BALLISTA_COLLECT_STATISTICS)
+    }
+
     pub fn default_with_information_schema(&self) -> bool {
         self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
     }
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index b187d8e9..628e8c6f 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,6 +22,7 @@ use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
 use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -362,9 +363,14 @@ impl UnresolvedStage {
             &input_locations,
         )?;
 
-        // Optimize join order based on new resolved statistics
+        // Optimize plan based on new resolved statistics
         let optimize_join = JoinSelection::new();
-        let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
+        let optimize_aggregate = AggregateStatistics::new();
+
+        let cfg: SessionConfig = SessionConfig::new();
+
+        let plan: Arc<dyn ExecutionPlan> = optimize_join.optimize(plan, &cfg)?;
+        let plan: Arc<dyn ExecutionPlan> = optimize_aggregate.optimize(plan, &cfg)?;
 
         Ok(ResolvedStage::new(
             self.stage_id,
diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs
index eb7df9f2..56701d0b 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -121,6 +121,7 @@ pub fn create_datafusion_context(
         .with_repartition_joins(ballista_config.repartition_joins())
         .with_repartition_aggregations(ballista_config.repartition_aggregations())
         .with_repartition_windows(ballista_config.repartition_windows())
+        .with_collect_statistics(ballista_config.collect_statistics())
         .with_parquet_pruning(ballista_config.parquet_pruning());
     let config = propagate_ballista_configs(config, ballista_config);
 
@@ -142,6 +143,7 @@ pub fn update_datafusion_context(
             .with_repartition_joins(ballista_config.repartition_joins())
             .with_repartition_aggregations(ballista_config.repartition_aggregations())
             .with_repartition_windows(ballista_config.repartition_windows())
+            .with_collect_statistics(ballista_config.collect_statistics())
             .with_parquet_pruning(ballista_config.parquet_pruning());
         let config = propagate_ballista_configs(config, ballista_config);
         mut_state.config = config;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index aabfea6f..624cf9e4 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -20,7 +20,7 @@
 use ballista::context::BallistaContext;
 use ballista::prelude::{
     BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
-    BALLISTA_JOB_NAME,
+    BALLISTA_JOB_NAME, BALLISTA_COLLECT_STATISTICS,
 };
 use datafusion::arrow::array::*;
 use datafusion::arrow::util::display::array_value_to_string;
@@ -363,6 +363,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
             &format!("Query derived from TPC-H q{}", opt.query),
         )
         .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+        .set(BALLISTA_COLLECT_STATISTICS, "true")
         .build()
         .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
 
diff --git a/python/src/context.rs b/python/src/context.rs
index 26c5661a..8b56f690 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -72,6 +72,7 @@ impl PySessionContext {
         repartition_windows: bool,
         parquet_pruning: bool,
         target_partitions: Option<usize>,
+        collect_statistics: bool,
         // TODO: config_options
     ) -> Self {
         let cfg = SessionConfig::new()
@@ -81,6 +82,7 @@ impl PySessionContext {
             .with_repartition_joins(repartition_joins)
             .with_repartition_aggregations(repartition_aggregations)
             .with_repartition_windows(repartition_windows)
+            .with_collect_statistics(collect_statistics)
             .with_parquet_pruning(parquet_pruning);
 
         let cfg_full = match target_partitions {