You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/05/30 16:50:15 UTC
[arrow-ballista] branch main updated: Add config to collect statistics, enable in TPC-H benchmark (#796)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 0d3a5119 Add config to collect statistics, enable in TPC-H benchmark (#796)
0d3a5119 is described below
commit 0d3a511986f547426ba1bd57850786f89c463c97
Author: Daniël Heres <da...@gmail.com>
AuthorDate: Tue May 30 18:50:10 2023 +0200
Add config to collect statistics, enable in TPC-H benchmark (#796)
* Add config to collect statistics
* Add config to collect statistics
* Add config to collect statistics
* Merge
* Merge
* Add threshold config option
* Fix
* Set to DF default
* Format
---------
Co-authored-by: Daniël Heres <da...@coralogix.com>
---
ballista/client/src/prelude.rs | 9 ++++----
ballista/core/src/config.rs | 29 +++++++++++++++++++++++++
ballista/scheduler/src/state/session_manager.rs | 5 +++++
benchmarks/src/bin/tpch.rs | 5 +++--
4 files changed, 42 insertions(+), 6 deletions(-)
diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 5b73728e..46bfa61b 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -19,10 +19,11 @@
pub use ballista_core::{
config::{
- BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
- BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_PLUGIN_DIR,
- BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS,
- BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA,
+ BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
+ BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING,
+ BALLISTA_PLUGIN_DIR, BALLISTA_REPARTITION_AGGREGATIONS,
+ BALLISTA_REPARTITION_JOINS, BALLISTA_REPARTITION_WINDOWS,
+ BALLISTA_WITH_INFORMATION_SCHEMA,
},
error::{BallistaError, Result},
};
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 6999dd4f..decd05dc 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -29,11 +29,15 @@ use datafusion::arrow::datatypes::DataType;
pub const BALLISTA_JOB_NAME: &str = "ballista.job.name";
pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions";
+pub const BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
+ "ballista.optimizer.hash_join_single_partition_threshold";
pub const BALLISTA_DEFAULT_BATCH_SIZE: &str = "ballista.batch.size";
pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins";
pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations";
pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
+pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics";
+
pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
/// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
@@ -138,6 +142,16 @@ impl BallistaConfig {
.parse::<usize>()
.map_err(|e| format!("{e:?}"))?;
}
+ DataType::UInt32 => {
+ val.to_string()
+ .parse::<usize>()
+ .map_err(|e| format!("{e:?}"))?;
+ }
+ DataType::UInt64 => {
+ val.to_string()
+ .parse::<usize>()
+ .map_err(|e| format!("{e:?}"))?;
+ }
DataType::Boolean => {
val.to_string()
.parse::<bool>()
@@ -181,6 +195,13 @@ impl BallistaConfig {
ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
"Sets whether enable information_schema".to_string(),
DataType::Boolean, Some("false".to_string())),
+ ConfigEntry::new(BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD.to_string(),
+ "Sets threshold in bytes for collecting the smaller side of the hash join in memory".to_string(),
+ DataType::UInt64, Some((1024 * 1024).to_string())),
+ ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(),
+ "Configuration for collecting statistics during scan".to_string(),
+ DataType::Boolean, Some("false".to_string())
+ ),
ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
"Sets the plugin dir".to_string(),
DataType::Utf8, Some("".to_string())),
@@ -207,6 +228,10 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_DEFAULT_BATCH_SIZE)
}
+ pub fn hash_join_single_partition_threshold(&self) -> usize {
+ self.get_usize_setting(BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
+ }
+
pub fn repartition_joins(&self) -> bool {
self.get_bool_setting(BALLISTA_REPARTITION_JOINS)
}
@@ -223,6 +248,10 @@ impl BallistaConfig {
self.get_bool_setting(BALLISTA_PARQUET_PRUNING)
}
+ pub fn collect_statistics(&self) -> bool {
+ self.get_bool_setting(BALLISTA_COLLECT_STATISTICS)
+ }
+
pub fn default_with_information_schema(&self) -> bool {
self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
}
diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs
index e6f99603..d1e0afce 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -73,7 +73,12 @@ pub fn create_datafusion_context(
.with_repartition_joins(ballista_config.repartition_joins())
.with_repartition_aggregations(ballista_config.repartition_aggregations())
.with_repartition_windows(ballista_config.repartition_windows())
+ .with_collect_statistics(ballista_config.collect_statistics())
.with_parquet_pruning(ballista_config.parquet_pruning())
+ .set_usize(
+ "datafusion.optimizer.hash_join_single_partition_threshold",
+ ballista_config.hash_join_single_partition_threshold(),
+ )
.set_bool("datafusion.optimizer.enable_round_robin_repartition", false);
let session_state = session_builder(config);
Arc::new(SessionContext::with_state(session_state))
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 061a2da1..c0cfe0c5 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -19,8 +19,8 @@
use ballista::context::BallistaContext;
use ballista::prelude::{
- BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
- BALLISTA_JOB_NAME,
+ BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
+ BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME,
};
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
@@ -364,6 +364,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
&format!("Query derived from TPC-H q{}", opt.query),
)
.set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
+ .set(BALLISTA_COLLECT_STATISTICS, "true")
.build()
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;