You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/09 00:25:01 UTC
[arrow-datafusion] branch master updated: Combined TPCH runs & uniformed summaries for benchmarks (#4128)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a32fb657d Combined TPCH runs & uniformed summaries for benchmarks (#4128)
a32fb657d is described below
commit a32fb657d1caec634cb53979b2f7ef2fad224905
Author: Batuhan Taskaya <is...@gmail.com>
AuthorDate: Wed Nov 9 03:24:56 2022 +0300
Combined TPCH runs & uniformed summaries for benchmarks (#4128)
---
benchmarks/README.md | 11 +++++
benchmarks/src/bin/tpch.rs | 117 ++++++++++++++++++++++++++++++++++-----------
2 files changed, 100 insertions(+), 28 deletions(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 65296da8d..f2f581fd2 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -49,6 +49,11 @@ The benchmark can then be run (assuming the data created from `dbgen` is in `./d
cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```
+If you omit `--query=<query_id>` argument, then all benchmarks will be run one by one (from query 1 to query 22).
+```bash
+cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path ./data --format tbl --batch-size 4096
+```
+
You can enable the features `simd` (to use SIMD instructions, `cargo nightly` is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`:
```
@@ -69,6 +74,12 @@ cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parq
Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.
+### Machine readable benchmark summary
+
+Any `tpch` execution with `-o <dir>` argument will produce a summary file right under the `<dir>`
+directory. It is a JSON serialized form of all the runs that happened as well as the runtime metadata
+(number of cores, DataFusion version, etc.).
+
## Expected output
The result of query 1 should produce the following output when executed against the SF=1 dataset.
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 33a0443ee..f0977fe32 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -62,9 +62,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[derive(Debug, StructOpt, Clone)]
struct DataFusionBenchmarkOpt {
- /// Query number
+ /// Query number. If not specified, runs all queries
#[structopt(short, long)]
- query: usize,
+ query: Option<usize>,
/// Activate debug mode to see query results
#[structopt(short, long)]
@@ -182,9 +182,37 @@ async fn main() -> Result<()> {
}
}
-async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
+const TPCH_QUERY_START_ID: usize = 1;
+const TPCH_QUERY_END_ID: usize = 22;
+
+async fn benchmark_datafusion(
+ opt: DataFusionBenchmarkOpt,
+) -> Result<Vec<Vec<RecordBatch>>> {
println!("Running benchmarks with the following options: {:?}", opt);
- let mut benchmark_run = BenchmarkRun::new(opt.query);
+ let query_range = match opt.query {
+ Some(query_id) => query_id..=query_id,
+ None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
+ };
+
+ let mut benchmark_run = BenchmarkRun::new();
+ let mut results = vec![];
+ for query_id in query_range {
+ let (query_run, result) = benchmark_query(&opt, query_id).await?;
+ results.push(result);
+ benchmark_run.add_query(query_run);
+ }
+
+ if let Some(path) = &opt.output_path {
+ write_summary_json(&mut benchmark_run, path)?;
+ }
+ Ok(results)
+}
+
+async fn benchmark_query(
+ opt: &DataFusionBenchmarkOpt,
+ query_id: usize,
+) -> Result<(QueryRun, Vec<RecordBatch>)> {
+ let mut benchmark_run = QueryRun::new(query_id);
let config = SessionConfig::new()
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size)
@@ -192,7 +220,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
let ctx = SessionContext::with_config(config);
// register tables
- register_tables(&opt, &ctx).await?;
+ register_tables(opt, &ctx).await?;
let mut millis = vec![];
// run benchmark
@@ -200,11 +228,11 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
for i in 0..opt.iterations {
let start = Instant::now();
- let sql = &get_query_sql(opt.query)?;
+ let sql = &get_query_sql(query_id)?;
// query 15 is special, with 3 statements. the second statement is the one from which we
// want to capture the results
- if opt.query == 15 {
+ if query_id == 15 {
for (n, query) in sql.iter().enumerate() {
if n == 1 {
result = execute_query(&ctx, query, opt.debug).await?;
@@ -223,19 +251,15 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
- opt.query, i, elapsed, row_count
+ query_id, i, elapsed, row_count
);
benchmark_run.add_result(elapsed, row_count);
}
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
- println!("Query {} avg time: {:.2} ms", opt.query, avg);
+ println!("Query {} avg time: {:.2} ms", query_id, avg);
- if let Some(path) = &opt.output_path {
- write_summary_json(&mut benchmark_run, path)?;
- }
-
- Ok(result)
+ Ok((benchmark_run, result))
}
#[allow(clippy::await_holding_lock)]
@@ -278,10 +302,7 @@ async fn register_tables(
fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> {
let json =
serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable");
- let filename = format!(
- "tpch-q{}-{}.json",
- benchmark_run.query, benchmark_run.start_time
- );
+ let filename = format!("tpch-summary--{}.json", benchmark_run.context.start_time);
let path = path.join(filename);
println!(
"Writing summary file to {}",
@@ -391,7 +412,7 @@ async fn get_table(
}
#[derive(Debug, Serialize)]
-struct BenchmarkRun {
+struct RunContext {
/// Benchmark crate version
benchmark_version: String,
/// DataFusion crate version
@@ -402,14 +423,10 @@ struct BenchmarkRun {
start_time: u64,
/// CLI arguments
arguments: Vec<String>,
- /// query number
- query: usize,
- /// list of individual run times and row counts
- iterations: Vec<QueryResult>,
}
-impl BenchmarkRun {
- fn new(query: usize) -> Self {
+impl RunContext {
+ fn new() -> Self {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
@@ -422,8 +439,50 @@ impl BenchmarkRun {
.skip(1)
.into_iter()
.collect::<Vec<String>>(),
+ }
+ }
+}
+
+#[derive(Debug, Serialize)]
+struct BenchmarkRun {
+ /// Information regarding the environment in which the benchmark was run
+ context: RunContext,
+ /// Per-query summaries
+ queries: Vec<QueryRun>,
+}
+
+impl BenchmarkRun {
+ fn new() -> Self {
+ Self {
+ context: RunContext::new(),
+ queries: vec![],
+ }
+ }
+
+ fn add_query(&mut self, query: QueryRun) {
+ self.queries.push(query)
+ }
+}
+
+#[derive(Debug, Serialize)]
+struct QueryRun {
+ /// query number
+ query: usize,
+ /// list of individual run times and row counts
+ iterations: Vec<QueryResult>,
+ /// Start time
+ start_time: u64,
+}
+
+impl QueryRun {
+ fn new(query: usize) -> Self {
+ Self {
query,
iterations: vec![],
+ start_time: SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("current time is later than UNIX_EPOCH")
+ .as_secs(),
}
}
@@ -775,7 +834,7 @@ mod ci {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
- query,
+ query: Some(query),
debug: false,
iterations: 1,
partitions: 2,
@@ -1087,7 +1146,7 @@ mod ci {
// run the query to compute actual results of the query
let opt = DataFusionBenchmarkOpt {
- query: n,
+ query: Some(n),
debug: false,
iterations: 1,
partitions: 2,
@@ -1098,8 +1157,10 @@ mod ci {
output_path: None,
disable_statistics: false,
};
- let actual = benchmark_datafusion(opt).await?;
+ let mut results = benchmark_datafusion(opt).await?;
+ assert_eq!(results.len(), 1);
+ let actual = results.remove(0);
let transformed = transform_actual_result(actual, n).await?;
// assert schema data types match