You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/05/30 11:53:25 UTC

[arrow-ballista] branch add_collect_statistics created (now 4d98d268)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch add_collect_statistics
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


      at 4d98d268 Add config to collect statistics

This branch includes the following new commits:

     new 4d98d268 Add config to collect statistics

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-ballista] 01/01: Add config to collect statistics

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch add_collect_statistics
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 4d98d2682d98b4d280f314da00d4758fb3182b22
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Tue May 30 13:53:15 2023 +0200

    Add config to collect statistics
---
 ballista/client/src/prelude.rs                                 |  2 +-
 ballista/core/src/config.rs                                    | 10 ++++++++++
 .../scheduler/src/state/execution_graph/execution_stage.rs     | 10 ++++++++--
 ballista/scheduler/src/state/session_manager.rs                |  2 ++
 benchmarks/src/bin/tpch.rs                                     |  3 ++-
 python/src/context.rs                                          |  2 ++
 6 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 5b73728e..c2984789 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -22,7 +22,7 @@ pub use ballista_core::{
         BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
         BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_PLUGIN_DIR,
         BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS,
-        BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA,
+        BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA, BALLISTA_COLLECT_STATISTICS
     },
     error::{BallistaError, Result},
 };
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index c3981bef..653167ba 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -34,6 +34,8 @@ pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins";
 pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations";
 pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
 pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
+pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
+
 pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
 /// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
 pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
@@ -182,6 +184,10 @@ impl BallistaConfig {
             ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
                              "Sets whether enable information_schema".to_string(),
                              DataType::Boolean, Some("false".to_string())),
+            ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(),
+                "Configuration for collecting statistics during scan".to_string(),
+                DataType::Boolean, Some("true".to_string())
+            ),
             ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
                              "Sets the plugin dir".to_string(),
                              DataType::Utf8, Some("".to_string())),
@@ -224,6 +230,10 @@ impl BallistaConfig {
         self.get_bool_setting(BALLISTA_PARQUET_PRUNING)
     }
 
+    pub fn collect_statistics(&self) -> bool {
+        self.get_bool_setting(BALLISTA_COLLECT_STATISTICS)
+    }
+
     pub fn default_with_information_schema(&self) -> bool {
         self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
     }
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index b187d8e9..628e8c6f 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,6 +22,7 @@ use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
 use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -362,9 +363,14 @@ impl UnresolvedStage {
             &input_locations,
         )?;
 
-        // Optimize join order based on new resolved statistics
+        // Optimize plan based on new resolved statistics
         let optimize_join = JoinSelection::new();
-        let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
+        let optimize_aggregate = AggregateStatistics::new();
+
+        let cfg: SessionConfig = SessionConfig::new();
+
+        let plan: Arc<dyn ExecutionPlan> = optimize_join.optimize(plan, &cfg)?;
+        let plan: Arc<dyn ExecutionPlan> = optimize_aggregate.optimize(plan, &cfg)?;
 
         Ok(ResolvedStage::new(
             self.stage_id,
diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs
index eb7df9f2..56701d0b 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -121,6 +121,7 @@ pub fn create_datafusion_context(
         .with_repartition_joins(ballista_config.repartition_joins())
         .with_repartition_aggregations(ballista_config.repartition_aggregations())
         .with_repartition_windows(ballista_config.repartition_windows())
+        .with_collect_statistics(ballista_config.collect_statistics())
         .with_parquet_pruning(ballista_config.parquet_pruning());
     let config = propagate_ballista_configs(config, ballista_config);
 
@@ -142,6 +143,7 @@ pub fn update_datafusion_context(
             .with_repartition_joins(ballista_config.repartition_joins())
             .with_repartition_aggregations(ballista_config.repartition_aggregations())
             .with_repartition_windows(ballista_config.repartition_windows())
+            .with_collect_statistics(ballista_config.collect_statistics())
             .with_parquet_pruning(ballista_config.parquet_pruning());
         let config = propagate_ballista_configs(config, ballista_config);
         mut_state.config = config;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index aabfea6f..624cf9e4 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -20,7 +20,7 @@
 use ballista::context::BallistaContext;
 use ballista::prelude::{
     BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
-    BALLISTA_JOB_NAME,
+    BALLISTA_JOB_NAME, BALLISTA_COLLECT_STATISTICS,
 };
 use datafusion::arrow::array::*;
 use datafusion::arrow::util::display::array_value_to_string;
@@ -363,6 +363,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
             &format!("Query derived from TPC-H q{}", opt.query),
         )
         .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+        .set(BALLISTA_COLLECT_STATISTICS, "true")
         .build()
         .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
 
diff --git a/python/src/context.rs b/python/src/context.rs
index 26c5661a..8b56f690 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -72,6 +72,7 @@ impl PySessionContext {
         repartition_windows: bool,
         parquet_pruning: bool,
         target_partitions: Option<usize>,
+        collect_statistics: bool,
         // TODO: config_options
     ) -> Self {
         let cfg = SessionConfig::new()
@@ -81,6 +82,7 @@ impl PySessionContext {
             .with_repartition_joins(repartition_joins)
             .with_repartition_aggregations(repartition_aggregations)
             .with_repartition_windows(repartition_windows)
+            .with_collect_statistics(collect_statistics)
             .with_parquet_pruning(parquet_pruning);
 
         let cfg_full = match target_partitions {