You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/09 00:25:01 UTC

[arrow-datafusion] branch master updated: Combined TPCH runs & uniformed summaries for benchmarks (#4128)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a32fb657d Combined TPCH runs & uniformed summaries for benchmarks (#4128)
a32fb657d is described below

commit a32fb657d1caec634cb53979b2f7ef2fad224905
Author: Batuhan Taskaya <is...@gmail.com>
AuthorDate: Wed Nov 9 03:24:56 2022 +0300

    Combined TPCH runs & uniformed summaries for benchmarks (#4128)
---
 benchmarks/README.md       |  11 +++++
 benchmarks/src/bin/tpch.rs | 117 ++++++++++++++++++++++++++++++++++-----------
 2 files changed, 100 insertions(+), 28 deletions(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index 65296da8d..f2f581fd2 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -49,6 +49,11 @@ The benchmark can then be run (assuming the data created from `dbgen` is in `./d
 cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
 ```
 
+If you omit `--query=<query_id>` argument, then all benchmarks will be run one by one (from query 1 to query 22).
+```bash
+cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path ./data --format tbl --batch-size 4096
+```
+
 You can enable the features `simd` (to use SIMD instructions, `cargo nightly` is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`:
 
 ```
@@ -69,6 +74,12 @@ cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parq
 
 Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.
 
+### Machine readable benchmark summary
+
+Any `tpch` execution with `-o <dir>` argument will produce a summary file right under the `<dir>`
+directory. It is a JSON serialized form of all the runs that happened as well as the runtime metadata
+(number of cores, DataFusion version, etc.).
+
 ## Expected output
 
 The result of query 1 should produce the following output when executed against the SF=1 dataset.
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 33a0443ee..f0977fe32 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -62,9 +62,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 #[derive(Debug, StructOpt, Clone)]
 struct DataFusionBenchmarkOpt {
-    /// Query number
+    /// Query number. If not specified, runs all queries
     #[structopt(short, long)]
-    query: usize,
+    query: Option<usize>,
 
     /// Activate debug mode to see query results
     #[structopt(short, long)]
@@ -182,9 +182,37 @@ async fn main() -> Result<()> {
     }
 }
 
-async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
+const TPCH_QUERY_START_ID: usize = 1;
+const TPCH_QUERY_END_ID: usize = 22;
+
+async fn benchmark_datafusion(
+    opt: DataFusionBenchmarkOpt,
+) -> Result<Vec<Vec<RecordBatch>>> {
     println!("Running benchmarks with the following options: {:?}", opt);
-    let mut benchmark_run = BenchmarkRun::new(opt.query);
+    let query_range = match opt.query {
+        Some(query_id) => query_id..=query_id,
+        None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
+    };
+
+    let mut benchmark_run = BenchmarkRun::new();
+    let mut results = vec![];
+    for query_id in query_range {
+        let (query_run, result) = benchmark_query(&opt, query_id).await?;
+        results.push(result);
+        benchmark_run.add_query(query_run);
+    }
+
+    if let Some(path) = &opt.output_path {
+        write_summary_json(&mut benchmark_run, path)?;
+    }
+    Ok(results)
+}
+
+async fn benchmark_query(
+    opt: &DataFusionBenchmarkOpt,
+    query_id: usize,
+) -> Result<(QueryRun, Vec<RecordBatch>)> {
+    let mut benchmark_run = QueryRun::new(query_id);
     let config = SessionConfig::new()
         .with_target_partitions(opt.partitions)
         .with_batch_size(opt.batch_size)
@@ -192,7 +220,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
     let ctx = SessionContext::with_config(config);
 
     // register tables
-    register_tables(&opt, &ctx).await?;
+    register_tables(opt, &ctx).await?;
 
     let mut millis = vec![];
     // run benchmark
@@ -200,11 +228,11 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
     for i in 0..opt.iterations {
         let start = Instant::now();
 
-        let sql = &get_query_sql(opt.query)?;
+        let sql = &get_query_sql(query_id)?;
 
         // query 15 is special, with 3 statements. the second statement is the one from which we
         // want to capture the results
-        if opt.query == 15 {
+        if query_id == 15 {
             for (n, query) in sql.iter().enumerate() {
                 if n == 1 {
                     result = execute_query(&ctx, query, opt.debug).await?;
@@ -223,19 +251,15 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
         let row_count = result.iter().map(|b| b.num_rows()).sum();
         println!(
             "Query {} iteration {} took {:.1} ms and returned {} rows",
-            opt.query, i, elapsed, row_count
+            query_id, i, elapsed, row_count
         );
         benchmark_run.add_result(elapsed, row_count);
     }
 
     let avg = millis.iter().sum::<f64>() / millis.len() as f64;
-    println!("Query {} avg time: {:.2} ms", opt.query, avg);
+    println!("Query {} avg time: {:.2} ms", query_id, avg);
 
-    if let Some(path) = &opt.output_path {
-        write_summary_json(&mut benchmark_run, path)?;
-    }
-
-    Ok(result)
+    Ok((benchmark_run, result))
 }
 
 #[allow(clippy::await_holding_lock)]
@@ -278,10 +302,7 @@ async fn register_tables(
 fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> {
     let json =
         serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable");
-    let filename = format!(
-        "tpch-q{}-{}.json",
-        benchmark_run.query, benchmark_run.start_time
-    );
+    let filename = format!("tpch-summary--{}.json", benchmark_run.context.start_time);
     let path = path.join(filename);
     println!(
         "Writing summary file to {}",
@@ -391,7 +412,7 @@ async fn get_table(
 }
 
 #[derive(Debug, Serialize)]
-struct BenchmarkRun {
+struct RunContext {
     /// Benchmark crate version
     benchmark_version: String,
     /// DataFusion crate version
@@ -402,14 +423,10 @@ struct BenchmarkRun {
     start_time: u64,
     /// CLI arguments
     arguments: Vec<String>,
-    /// query number
-    query: usize,
-    /// list of individual run times and row counts
-    iterations: Vec<QueryResult>,
 }
 
-impl BenchmarkRun {
-    fn new(query: usize) -> Self {
+impl RunContext {
+    fn new() -> Self {
         Self {
             benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
             datafusion_version: DATAFUSION_VERSION.to_owned(),
@@ -422,8 +439,50 @@ impl BenchmarkRun {
                 .skip(1)
                 .into_iter()
                 .collect::<Vec<String>>(),
+        }
+    }
+}
+
+#[derive(Debug, Serialize)]
+struct BenchmarkRun {
+    /// Information regarding the environment in which the benchmark was run
+    context: RunContext,
+    /// Per-query summaries
+    queries: Vec<QueryRun>,
+}
+
+impl BenchmarkRun {
+    fn new() -> Self {
+        Self {
+            context: RunContext::new(),
+            queries: vec![],
+        }
+    }
+
+    fn add_query(&mut self, query: QueryRun) {
+        self.queries.push(query)
+    }
+}
+
+#[derive(Debug, Serialize)]
+struct QueryRun {
+    /// query number
+    query: usize,
+    /// list of individual run times and row counts
+    iterations: Vec<QueryResult>,
+    /// Start time
+    start_time: u64,
+}
+
+impl QueryRun {
+    fn new(query: usize) -> Self {
+        Self {
             query,
             iterations: vec![],
+            start_time: SystemTime::now()
+                .duration_since(SystemTime::UNIX_EPOCH)
+                .expect("current time is later than UNIX_EPOCH")
+                .as_secs(),
         }
     }
 
@@ -775,7 +834,7 @@ mod ci {
         let ctx = SessionContext::default();
         let path = get_tpch_data_path()?;
         let opt = DataFusionBenchmarkOpt {
-            query,
+            query: Some(query),
             debug: false,
             iterations: 1,
             partitions: 2,
@@ -1087,7 +1146,7 @@ mod ci {
 
         // run the query to compute actual results of the query
         let opt = DataFusionBenchmarkOpt {
-            query: n,
+            query: Some(n),
             debug: false,
             iterations: 1,
             partitions: 2,
@@ -1098,8 +1157,10 @@ mod ci {
             output_path: None,
             disable_statistics: false,
         };
-        let actual = benchmark_datafusion(opt).await?;
+        let mut results = benchmark_datafusion(opt).await?;
+        assert_eq!(results.len(), 1);
 
+        let actual = results.remove(0);
         let transformed = transform_actual_result(actual, n).await?;
 
         // assert schema data types match