You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/30 08:09:00 UTC

[arrow-datafusion] branch master updated: Execute LogicalPlans after building for TPCH Benchmarks (#3290)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3effee86f Execute LogicalPlans after building for TPCH Benchmarks (#3290)
3effee86f is described below

commit 3effee86f598c328d9d144a94f3a1d8872ad846c
Author: Dalton Modlin <31...@users.noreply.github.com>
AuthorDate: Tue Aug 30 02:08:53 2022 -0600

    Execute LogicalPlans after building for TPCH Benchmarks (#3290)
---
 benchmarks/src/bin/tpch.rs | 36 ++++++++++++++++--------------------
 1 file changed, 16 insertions(+), 20 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 08374bfcc..43db654e8 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -28,7 +28,6 @@ use std::{
 
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
-use datafusion::logical_plan::LogicalPlan;
 use datafusion::parquet::basic::Compression;
 use datafusion::parquet::file::properties::WriterProperties;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -196,10 +195,12 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
     let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
     for i in 0..opt.iterations {
         let start = Instant::now();
-        let plans = create_logical_plans(&ctx, opt.query)?;
-        for plan in plans {
-            result = execute_query(&ctx, &plan, opt.debug).await?;
+
+        let sql = &get_query_sql(opt.query)?;
+        for query in sql {
+            result = execute_query(&ctx, query, opt.debug).await?;
         }
+
         let elapsed = start.elapsed().as_secs_f64() * 1000.0;
         millis.push(elapsed as f64);
         let row_count = result.iter().map(|b| b.num_rows()).sum();
@@ -253,7 +254,7 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
                         .map(|s| s.trim())
                         .filter(|s| !s.is_empty())
                         .map(|s| s.to_string())
-                        .collect())
+                        .collect());
                 }
                 Err(e) => errors.push(format!("{}: {}", filename, e)),
             };
@@ -269,23 +270,18 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
     }
 }
 
-/// Create a logical plan for each query in the specified query file
-fn create_logical_plans(ctx: &SessionContext, query: usize) -> Result<Vec<LogicalPlan>> {
-    let sql = get_query_sql(query)?;
-    sql.iter()
-        .map(|sql| ctx.create_logical_plan(sql.as_str()))
-        .collect::<Result<Vec<_>>>()
-}
-
 async fn execute_query(
     ctx: &SessionContext,
-    plan: &LogicalPlan,
+    sql: &str,
     debug: bool,
 ) -> Result<Vec<RecordBatch>> {
+    let plan = ctx.sql(sql).await?;
+    let plan = plan.to_logical_plan()?;
+
     if debug {
         println!("=== Logical plan ===\n{:?}\n", plan);
     }
-    let plan = ctx.optimize(plan)?;
+    let plan = ctx.optimize(&plan)?;
     if debug {
         println!("=== Optimized logical plan ===\n{:?}\n", plan);
     }
@@ -357,7 +353,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
                         return Err(DataFusionError::NotImplemented(format!(
                             "Invalid compression format: {}",
                             other
-                        )))
+                        )));
                     }
                 };
                 let props = WriterProperties::builder()
@@ -369,7 +365,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
                 return Err(DataFusionError::NotImplemented(format!(
                     "Invalid output format: {}",
                     other
-                )))
+                )));
             }
         }
         println!("Conversion completed in {} ms", start.elapsed().as_millis());
@@ -1022,9 +1018,9 @@ mod tests {
             ctx.register_table(table, Arc::new(provider))?;
         }
 
-        let plans = create_logical_plans(&ctx, n)?;
-        for plan in plans {
-            execute_query(&ctx, &plan, false).await?;
+        let sql = &get_query_sql(n)?;
+        for query in sql {
+            execute_query(&ctx, query, false).await?;
         }
 
         Ok(())