You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/05/30 16:50:15 UTC

[arrow-ballista] branch main updated: Add config to collect statistics, enable in TPC-H benchmark (#796)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d3a5119 Add config to collect statistics, enable in TPC-H benchmark (#796)
0d3a5119 is described below

commit 0d3a511986f547426ba1bd57850786f89c463c97
Author: Daniël Heres <da...@gmail.com>
AuthorDate: Tue May 30 18:50:10 2023 +0200

    Add config to collect statistics, enable in TPC-H benchmark (#796)
    
    * Add config to collect statistics
    
    * Add config to collect statistics
    
    * Add config to collect statistics
    
    * Merge
    
    * Merge
    
    * Add threshold config option
    
    * Fix
    
    * Set to DF default
    
    * Format
    
    ---------
    
    Co-authored-by: Daniël Heres <da...@coralogix.com>
---
 ballista/client/src/prelude.rs                  |  9 ++++----
 ballista/core/src/config.rs                     | 29 +++++++++++++++++++++++++
 ballista/scheduler/src/state/session_manager.rs |  5 +++++
 benchmarks/src/bin/tpch.rs                      |  5 +++--
 4 files changed, 42 insertions(+), 6 deletions(-)

diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 5b73728e..46bfa61b 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -19,10 +19,11 @@
 
 pub use ballista_core::{
     config::{
-        BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
-        BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_PLUGIN_DIR,
-        BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS,
-        BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA,
+        BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
+        BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING,
+        BALLISTA_PLUGIN_DIR, BALLISTA_REPARTITION_AGGREGATIONS,
+        BALLISTA_REPARTITION_JOINS, BALLISTA_REPARTITION_WINDOWS,
+        BALLISTA_WITH_INFORMATION_SCHEMA,
     },
     error::{BallistaError, Result},
 };
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 6999dd4f..decd05dc 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -29,11 +29,15 @@ use datafusion::arrow::datatypes::DataType;
 
 pub const BALLISTA_JOB_NAME: &str = "ballista.job.name";
 pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions";
+pub const BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
+    "ballista.optimizer.hash_join_single_partition_threshold";
 pub const BALLISTA_DEFAULT_BATCH_SIZE: &str = "ballista.batch.size";
 pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins";
 pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations";
 pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
 pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
+pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
+
 pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
 /// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
 pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
@@ -138,6 +142,16 @@ impl BallistaConfig {
                     .parse::<usize>()
                     .map_err(|e| format!("{e:?}"))?;
             }
+            DataType::UInt32 => {
+                val.to_string()
+                    .parse::<usize>()
+                    .map_err(|e| format!("{e:?}"))?;
+            }
+            DataType::UInt64 => {
+                val.to_string()
+                    .parse::<usize>()
+                    .map_err(|e| format!("{e:?}"))?;
+            }
             DataType::Boolean => {
                 val.to_string()
                     .parse::<bool>()
@@ -181,6 +195,13 @@ impl BallistaConfig {
             ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
                              "Sets whether enable information_schema".to_string(),
                              DataType::Boolean, Some("false".to_string())),
+            ConfigEntry::new(BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD.to_string(),
+                "Sets threshold in bytes for collecting the smaller side of the hash join in memory".to_string(),
+                DataType::UInt64, Some((1024 * 1024).to_string())),
+            ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(),
+                "Configuration for collecting statistics during scan".to_string(),
+                DataType::Boolean, Some("false".to_string())
+            ),
             ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
                              "Sets the plugin dir".to_string(),
                              DataType::Utf8, Some("".to_string())),
@@ -207,6 +228,10 @@ impl BallistaConfig {
         self.get_usize_setting(BALLISTA_DEFAULT_BATCH_SIZE)
     }
 
+    pub fn hash_join_single_partition_threshold(&self) -> usize {
+        self.get_usize_setting(BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
+    }
+
     pub fn repartition_joins(&self) -> bool {
         self.get_bool_setting(BALLISTA_REPARTITION_JOINS)
     }
@@ -223,6 +248,10 @@ impl BallistaConfig {
         self.get_bool_setting(BALLISTA_PARQUET_PRUNING)
     }
 
+    pub fn collect_statistics(&self) -> bool {
+        self.get_bool_setting(BALLISTA_COLLECT_STATISTICS)
+    }
+
     pub fn default_with_information_schema(&self) -> bool {
         self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
     }
diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs
index e6f99603..d1e0afce 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -73,7 +73,12 @@ pub fn create_datafusion_context(
         .with_repartition_joins(ballista_config.repartition_joins())
         .with_repartition_aggregations(ballista_config.repartition_aggregations())
         .with_repartition_windows(ballista_config.repartition_windows())
+        .with_collect_statistics(ballista_config.collect_statistics())
         .with_parquet_pruning(ballista_config.parquet_pruning())
+        .set_usize(
+            "datafusion.optimizer.hash_join_single_partition_threshold",
+            ballista_config.hash_join_single_partition_threshold(),
+        )
         .set_bool("datafusion.optimizer.enable_round_robin_repartition", false);
     let session_state = session_builder(config);
     Arc::new(SessionContext::with_state(session_state))
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 061a2da1..c0cfe0c5 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -19,8 +19,8 @@
 
 use ballista::context::BallistaContext;
 use ballista::prelude::{
-    BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
-    BALLISTA_JOB_NAME,
+    BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
+    BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME,
 };
 use datafusion::arrow::array::*;
 use datafusion::arrow::util::display::array_value_to_string;
@@ -364,6 +364,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
             &format!("Query derived from TPC-H q{}", opt.query),
         )
         .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+        .set(BALLISTA_COLLECT_STATISTICS, "true")
         .build()
         .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;