You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/11 02:35:46 UTC

[arrow-datafusion] branch master updated: Enable multi-statement benchmark queries (#2507)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 559c294aa Enable multi-statement benchmark queries (#2507)
559c294aa is described below

commit 559c294aa49cfa4c4ba4f08778d30eee6a02ca62
Author: Andy Grove <ag...@apache.org>
AuthorDate: Tue May 10 20:35:42 2022 -0600

    Enable multi-statement benchmark queries (#2507)
---
 benchmarks/src/bin/tpch.rs | 161 ++++++++++++++++++++++++++-------------------
 1 file changed, 92 insertions(+), 69 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d87a9c042..8542dc3dc 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -313,8 +313,10 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
     let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
     for i in 0..opt.iterations {
         let start = Instant::now();
-        let plan = create_logical_plan(&ctx, opt.query)?;
-        result = execute_query(&ctx, &plan, opt.debug).await?;
+        let plans = create_logical_plans(&ctx, opt.query)?;
+        for plan in plans {
+            result = execute_query(&ctx, &plan, opt.debug).await?;
+        }
         let elapsed = start.elapsed().as_secs_f64() * 1000.0;
         millis.push(elapsed as f64);
         let row_count = result.iter().map(|b| b.num_rows()).sum();
@@ -362,20 +364,26 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
     let mut millis = vec![];
 
     // run benchmark
-    let sql = get_query_sql(opt.query)?;
-    println!("Running benchmark with query {}:\n {}", opt.query, sql);
+    let queries = get_query_sql(opt.query)?;
+    println!(
+        "Running benchmark with queries {}:\n {:?}",
+        opt.query, queries
+    );
+    let mut batches = vec![];
     for i in 0..opt.iterations {
         let start = Instant::now();
-        let df = ctx
-            .sql(&sql)
-            .await
-            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
-            .unwrap();
-        let batches = df
-            .collect()
-            .await
-            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
-            .unwrap();
+        for sql in &queries {
+            let df = ctx
+                .sql(sql)
+                .await
+                .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                .unwrap();
+            batches = df
+                .collect()
+                .await
+                .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                .unwrap();
+        }
         let elapsed = start.elapsed().as_secs_f64() * 1000.0;
         millis.push(elapsed as f64);
         let row_count = batches.iter().map(|b| b.num_rows()).sum();
@@ -571,7 +579,8 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
     }
 }
 
-fn get_query_sql(query: usize) -> Result<String> {
+/// Get the SQL statements from the specified query file
+fn get_query_sql(query: usize) -> Result<Vec<String>> {
     if query > 0 && query < 23 {
         let possibilities = vec![
             format!("queries/q{}.sql", query),
@@ -580,7 +589,14 @@ fn get_query_sql(query: usize) -> Result<String> {
         let mut errors = vec![];
         for filename in possibilities {
             match fs::read_to_string(&filename) {
-                Ok(contents) => return Ok(contents),
+                Ok(contents) => {
+                    return Ok(contents
+                        .split(';')
+                        .map(|s| s.trim())
+                        .filter(|s| !s.is_empty())
+                        .map(|s| s.to_string())
+                        .collect())
+                }
                 Err(e) => errors.push(format!("{}: {}", filename, e)),
             };
         }
@@ -595,9 +611,12 @@ fn get_query_sql(query: usize) -> Result<String> {
     }
 }
 
-fn create_logical_plan(ctx: &SessionContext, query: usize) -> Result<LogicalPlan> {
+/// Create a logical plan for each query in the specified query file
+fn create_logical_plans(ctx: &SessionContext, query: usize) -> Result<Vec<LogicalPlan>> {
     let sql = get_query_sql(query)?;
-    ctx.create_logical_plan(&sql)
+    sql.iter()
+        .map(|sql| ctx.create_logical_plan(sql.as_str()))
+        .collect::<Result<Vec<_>>>()
 }
 
 async fn execute_query(
@@ -1349,7 +1368,7 @@ mod tests {
     }
 
     async fn run_query(n: usize) -> Result<()> {
-        // Tests running query with empty tables, to see whether they run succesfully.
+        // Tests running query with empty tables, to see whether they run successfully.
 
         let config = SessionConfig::new()
             .with_target_partitions(1)
@@ -1365,8 +1384,10 @@ mod tests {
             ctx.register_table(table, Arc::new(provider))?;
         }
 
-        let plan = create_logical_plan(&ctx, n)?;
-        execute_query(&ctx, &plan, false).await?;
+        let plans = create_logical_plans(&ctx, n)?;
+        for plan in plans {
+            execute_query(&ctx, &plan, false).await?;
+        }
 
         Ok(())
     }
@@ -1482,61 +1503,63 @@ mod tests {
             }
 
             // test logical plan round trip
-            let plan = create_logical_plan(&ctx, n)?;
-            let proto: protobuf::LogicalPlanNode =
-                protobuf::LogicalPlanNode::try_from_logical_plan(
-                    &plan,
-                    codec.logical_extension_codec(),
-                )
-                .unwrap();
-            let round_trip: LogicalPlan = (&proto)
-                .try_into_logical_plan(&ctx, codec.logical_extension_codec())
-                .unwrap();
-            assert_eq!(
-                format!("{:?}", plan),
-                format!("{:?}", round_trip),
-                "logical plan round trip failed"
-            );
-
-            // test optimized logical plan round trip
-            let plan = ctx.optimize(&plan)?;
-            let proto: protobuf::LogicalPlanNode =
-                protobuf::LogicalPlanNode::try_from_logical_plan(
-                    &plan,
-                    codec.logical_extension_codec(),
-                )
-                .unwrap();
-            let round_trip: LogicalPlan = (&proto)
-                .try_into_logical_plan(&ctx, codec.logical_extension_codec())
-                .unwrap();
-            assert_eq!(
-                format!("{:?}", plan),
-                format!("{:?}", round_trip),
-                "optimized logical plan round trip failed"
-            );
-
-            // test physical plan roundtrip
-            if env::var("TPCH_DATA").is_ok() {
-                let physical_plan = ctx.create_physical_plan(&plan).await?;
-                let proto: protobuf::PhysicalPlanNode =
-                    protobuf::PhysicalPlanNode::try_from_physical_plan(
-                        physical_plan.clone(),
-                        codec.physical_extension_codec(),
+            let plans = create_logical_plans(&ctx, n)?;
+            for plan in plans {
+                let proto: protobuf::LogicalPlanNode =
+                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                        &plan,
+                        codec.logical_extension_codec(),
                     )
                     .unwrap();
-                let runtime = ctx.runtime_env();
-                let round_trip: Arc<dyn ExecutionPlan> = (&proto)
-                    .try_into_physical_plan(
-                        &ctx,
-                        runtime.deref(),
-                        codec.physical_extension_codec(),
+                let round_trip: LogicalPlan = (&proto)
+                    .try_into_logical_plan(&ctx, codec.logical_extension_codec())
+                    .unwrap();
+                assert_eq!(
+                    format!("{:?}", plan),
+                    format!("{:?}", round_trip),
+                    "logical plan round trip failed"
+                );
+
+                // test optimized logical plan round trip
+                let plan = ctx.optimize(&plan)?;
+                let proto: protobuf::LogicalPlanNode =
+                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                        &plan,
+                        codec.logical_extension_codec(),
                     )
                     .unwrap();
+                let round_trip: LogicalPlan = (&proto)
+                    .try_into_logical_plan(&ctx, codec.logical_extension_codec())
+                    .unwrap();
                 assert_eq!(
-                    format!("{:?}", physical_plan),
+                    format!("{:?}", plan),
                     format!("{:?}", round_trip),
-                    "physical plan round trip failed"
+                    "optimized logical plan round trip failed"
                 );
+
+                // test physical plan roundtrip
+                if env::var("TPCH_DATA").is_ok() {
+                    let physical_plan = ctx.create_physical_plan(&plan).await?;
+                    let proto: protobuf::PhysicalPlanNode =
+                        protobuf::PhysicalPlanNode::try_from_physical_plan(
+                            physical_plan.clone(),
+                            codec.physical_extension_codec(),
+                        )
+                        .unwrap();
+                    let runtime = ctx.runtime_env();
+                    let round_trip: Arc<dyn ExecutionPlan> = (&proto)
+                        .try_into_physical_plan(
+                            &ctx,
+                            runtime.deref(),
+                            codec.physical_extension_codec(),
+                        )
+                        .unwrap();
+                    assert_eq!(
+                        format!("{:?}", physical_plan),
+                        format!("{:?}", round_trip),
+                        "physical plan round trip failed"
+                    );
+                }
             }
 
             Ok(())