You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/11 02:35:46 UTC
[arrow-datafusion] branch master updated: Enable multi-statement benchmark queries (#2507)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 559c294aa Enable multi-statement benchmark queries (#2507)
559c294aa is described below
commit 559c294aa49cfa4c4ba4f08778d30eee6a02ca62
Author: Andy Grove <ag...@apache.org>
AuthorDate: Tue May 10 20:35:42 2022 -0600
Enable multi-statement benchmark queries (#2507)
---
benchmarks/src/bin/tpch.rs | 161 ++++++++++++++++++++++++++-------------------
1 file changed, 92 insertions(+), 69 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d87a9c042..8542dc3dc 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -313,8 +313,10 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
for i in 0..opt.iterations {
let start = Instant::now();
- let plan = create_logical_plan(&ctx, opt.query)?;
- result = execute_query(&ctx, &plan, opt.debug).await?;
+ let plans = create_logical_plans(&ctx, opt.query)?;
+ for plan in plans {
+ result = execute_query(&ctx, &plan, opt.debug).await?;
+ }
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
let row_count = result.iter().map(|b| b.num_rows()).sum();
@@ -362,20 +364,26 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
let mut millis = vec![];
// run benchmark
- let sql = get_query_sql(opt.query)?;
- println!("Running benchmark with query {}:\n {}", opt.query, sql);
+ let queries = get_query_sql(opt.query)?;
+ println!(
+ "Running benchmark with queries {}:\n {:?}",
+ opt.query, queries
+ );
+ let mut batches = vec![];
for i in 0..opt.iterations {
let start = Instant::now();
- let df = ctx
- .sql(&sql)
- .await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
- .unwrap();
- let batches = df
- .collect()
- .await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
- .unwrap();
+ for sql in &queries {
+ let df = ctx
+ .sql(sql)
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
+ batches = df
+ .collect()
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
+ }
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
let row_count = batches.iter().map(|b| b.num_rows()).sum();
@@ -571,7 +579,8 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
}
}
-fn get_query_sql(query: usize) -> Result<String> {
+/// Get the SQL statements from the specified query file
+fn get_query_sql(query: usize) -> Result<Vec<String>> {
if query > 0 && query < 23 {
let possibilities = vec![
format!("queries/q{}.sql", query),
@@ -580,7 +589,14 @@ fn get_query_sql(query: usize) -> Result<String> {
let mut errors = vec![];
for filename in possibilities {
match fs::read_to_string(&filename) {
- Ok(contents) => return Ok(contents),
+ Ok(contents) => {
+ return Ok(contents
+ .split(';')
+ .map(|s| s.trim())
+ .filter(|s| !s.is_empty())
+ .map(|s| s.to_string())
+ .collect())
+ }
Err(e) => errors.push(format!("{}: {}", filename, e)),
};
}
@@ -595,9 +611,12 @@ fn get_query_sql(query: usize) -> Result<String> {
}
}
-fn create_logical_plan(ctx: &SessionContext, query: usize) -> Result<LogicalPlan> {
+/// Create a logical plan for each query in the specified query file
+fn create_logical_plans(ctx: &SessionContext, query: usize) -> Result<Vec<LogicalPlan>> {
let sql = get_query_sql(query)?;
- ctx.create_logical_plan(&sql)
+ sql.iter()
+ .map(|sql| ctx.create_logical_plan(sql.as_str()))
+ .collect::<Result<Vec<_>>>()
}
async fn execute_query(
@@ -1349,7 +1368,7 @@ mod tests {
}
async fn run_query(n: usize) -> Result<()> {
- // Tests running query with empty tables, to see whether they run succesfully.
+ // Tests running query with empty tables, to see whether they run successfully.
let config = SessionConfig::new()
.with_target_partitions(1)
@@ -1365,8 +1384,10 @@ mod tests {
ctx.register_table(table, Arc::new(provider))?;
}
- let plan = create_logical_plan(&ctx, n)?;
- execute_query(&ctx, &plan, false).await?;
+ let plans = create_logical_plans(&ctx, n)?;
+ for plan in plans {
+ execute_query(&ctx, &plan, false).await?;
+ }
Ok(())
}
@@ -1482,61 +1503,63 @@ mod tests {
}
// test logical plan round trip
- let plan = create_logical_plan(&ctx, n)?;
- let proto: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- &plan,
- codec.logical_extension_codec(),
- )
- .unwrap();
- let round_trip: LogicalPlan = (&proto)
- .try_into_logical_plan(&ctx, codec.logical_extension_codec())
- .unwrap();
- assert_eq!(
- format!("{:?}", plan),
- format!("{:?}", round_trip),
- "logical plan round trip failed"
- );
-
- // test optimized logical plan round trip
- let plan = ctx.optimize(&plan)?;
- let proto: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- &plan,
- codec.logical_extension_codec(),
- )
- .unwrap();
- let round_trip: LogicalPlan = (&proto)
- .try_into_logical_plan(&ctx, codec.logical_extension_codec())
- .unwrap();
- assert_eq!(
- format!("{:?}", plan),
- format!("{:?}", round_trip),
- "optimized logical plan round trip failed"
- );
-
- // test physical plan roundtrip
- if env::var("TPCH_DATA").is_ok() {
- let physical_plan = ctx.create_physical_plan(&plan).await?;
- let proto: protobuf::PhysicalPlanNode =
- protobuf::PhysicalPlanNode::try_from_physical_plan(
- physical_plan.clone(),
- codec.physical_extension_codec(),
+ let plans = create_logical_plans(&ctx, n)?;
+ for plan in plans {
+ let proto: protobuf::LogicalPlanNode =
+ protobuf::LogicalPlanNode::try_from_logical_plan(
+ &plan,
+ codec.logical_extension_codec(),
)
.unwrap();
- let runtime = ctx.runtime_env();
- let round_trip: Arc<dyn ExecutionPlan> = (&proto)
- .try_into_physical_plan(
- &ctx,
- runtime.deref(),
- codec.physical_extension_codec(),
+ let round_trip: LogicalPlan = (&proto)
+ .try_into_logical_plan(&ctx, codec.logical_extension_codec())
+ .unwrap();
+ assert_eq!(
+ format!("{:?}", plan),
+ format!("{:?}", round_trip),
+ "logical plan round trip failed"
+ );
+
+ // test optimized logical plan round trip
+ let plan = ctx.optimize(&plan)?;
+ let proto: protobuf::LogicalPlanNode =
+ protobuf::LogicalPlanNode::try_from_logical_plan(
+ &plan,
+ codec.logical_extension_codec(),
)
.unwrap();
+ let round_trip: LogicalPlan = (&proto)
+ .try_into_logical_plan(&ctx, codec.logical_extension_codec())
+ .unwrap();
assert_eq!(
- format!("{:?}", physical_plan),
+ format!("{:?}", plan),
format!("{:?}", round_trip),
- "physical plan round trip failed"
+ "optimized logical plan round trip failed"
);
+
+ // test physical plan roundtrip
+ if env::var("TPCH_DATA").is_ok() {
+ let physical_plan = ctx.create_physical_plan(&plan).await?;
+ let proto: protobuf::PhysicalPlanNode =
+ protobuf::PhysicalPlanNode::try_from_physical_plan(
+ physical_plan.clone(),
+ codec.physical_extension_codec(),
+ )
+ .unwrap();
+ let runtime = ctx.runtime_env();
+ let round_trip: Arc<dyn ExecutionPlan> = (&proto)
+ .try_into_physical_plan(
+ &ctx,
+ runtime.deref(),
+ codec.physical_extension_codec(),
+ )
+ .unwrap();
+ assert_eq!(
+ format!("{:?}", physical_plan),
+ format!("{:?}", round_trip),
+ "physical plan round trip failed"
+ );
+ }
}
Ok(())