You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2022/10/08 13:39:03 UTC
[arrow-datafusion] branch row-filter-benchmarks updated: Rework to test everything at once
This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch row-filter-benchmarks
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/row-filter-benchmarks by this push:
new 66c10763e Rework to test everything at once
66c10763e is described below
commit 66c10763e64798afbc28250bd9359a41abf8b2f2
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Sat Oct 8 09:38:53 2022 -0400
Rework to test everything at once
---
benchmarks/Cargo.toml | 1 +
benchmarks/README.md | 40 ++---
.../{access_log.rs => parquet_filter_pushdown.rs} | 183 ++++++++++++++-------
3 files changed, 148 insertions(+), 76 deletions(-)
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 868475b6f..c07344508 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -38,6 +38,7 @@ env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
+object_store = "0.5.0"
parquet = "24.0.0"
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 1a08ac358..7cb8fedaf 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -127,36 +127,36 @@ h2o groupby query 1 took 1669 ms
[1]: http://www.tpc.org/tpch/
[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
-## Access Logs benchmarks
+## Parquet filter pushdown benchmarks
-This is a set of benchmarks for testing and verifying performance of parquet row filter pushdown. The queries are executed on
+This is a set of benchmarks for testing and verifying performance of parquet filter pushdown. The queries are executed on
a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs.
```base
-cargo run --release --bin access_logs --query --path ./data --scale-factor 1.0
+cargo run --release --bin parquet_filter_pushdown --query --path ./data --scale-factor 1.0
```
This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor`
(with the default value of `1.0` generating a ~1GB parquet file).
+For each filter we will run the query using different `ParquetScanOption` settings.
+
Example run:
```
Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 }
-Generated test dataset with 1266707 rows
-Executing 'get_requests'
-Query 'get_requests' iteration 0 returned 211247 rows in 677 ms
-Query 'get_requests' iteration 1 returned 211247 rows in 661 ms
-Query 'get_requests' iteration 2 returned 211247 rows in 699 ms
-Executing 'get_requests_ignore_body'
-Query 'get_requests_ignore_body' iteration 0 returned 211247 rows in 155 ms
-Query 'get_requests_ignore_body' iteration 1 returned 211247 rows in 159 ms
-Query 'get_requests_ignore_body' iteration 2 returned 211247 rows in 153 ms
-Executing 'get_post_503'
-Query 'get_post_503' iteration 0 returned 42350 rows in 650 ms
-Query 'get_post_503' iteration 1 returned 42350 rows in 659 ms
-Query 'get_post_503' iteration 2 returned 42350 rows in 706 ms
-Executing 'get_post_503_ignore_body'
-Query 'get_post_503_ignore_body' iteration 0 returned 42350 rows in 155 ms
-Query 'get_post_503_ignore_body' iteration 1 returned 42350 rows in 151 ms
-Query 'get_post_503_ignore_body' iteration 2 returned 42350 rows in 157 ms
+Generated test dataset with 10699521 rows
+Executing with filter 'request_method = Utf8("GET")'
+Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false }
+Iteration 0 returned 10699521 rows in 1303 ms
+Iteration 1 returned 10699521 rows in 1288 ms
+Iteration 2 returned 10699521 rows in 1266 ms
+Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: true, enable_page_index: true }
+Iteration 0 returned 1781686 rows in 1970 ms
+Iteration 1 returned 1781686 rows in 2002 ms
+Iteration 2 returned 1781686 rows in 1988 ms
+Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: false, enable_page_index: true }
+Iteration 0 returned 1781686 rows in 1940 ms
+Iteration 1 returned 1781686 rows in 1986 ms
+Iteration 2 returned 1781686 rows in 1947 ms
+...
```
\ No newline at end of file
diff --git a/benchmarks/src/bin/access_log.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
similarity index 68%
rename from benchmarks/src/bin/access_log.rs
rename to benchmarks/src/bin/parquet_filter_pushdown.rs
index 2867c0365..5bf7c11ef 100644
--- a/benchmarks/src/bin/access_log.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -6,12 +6,19 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use datafusion::common::Result;
+use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::{lit, or, Expr};
use datafusion::physical_plan::collect;
-use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
+use datafusion::physical_plan::file_format::{
+ FileScanConfig, ParquetExec, ParquetScanOptions,
+};
+use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext};
+use object_store::path::Path;
+use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
-use std::collections::HashMap;
use std::fs::File;
use std::ops::Range;
use std::path::PathBuf;
@@ -63,68 +70,122 @@ async fn main() -> Result<()> {
let path = opt.path.join("logs.parquet");
- gen_data(&path, opt.scale_factor);
+ let (object_store_url, object_meta) = gen_data(path, opt.scale_factor)?;
- ctx.register_parquet(
- "logs",
- path.to_str().unwrap(),
- ParquetReadOptions::default(),
+ run_benchmarks(
+ &mut ctx,
+ object_store_url.clone(),
+ object_meta.clone(),
+ opt.iterations,
+ opt.debug,
)
.await?;
- datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await?;
-
Ok(())
}
-async fn datafusion_sql_benchmarks(
+async fn run_benchmarks(
ctx: &mut SessionContext,
+ object_store_url: ObjectStoreUrl,
+ object_meta: ObjectMeta,
iterations: usize,
debug: bool,
) -> Result<()> {
- let mut queries = HashMap::new();
- queries.insert(
- "get_requests",
- "SELECT * FROM logs WHERE request_method = 'GET'",
- );
- queries.insert(
- "get_requests_ignore_body",
- "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'GET'"
- );
- queries.insert(
- "get_post_503",
- "SELECT * FROM logs WHERE request_method = 'POST' AND response_status = 503",
- );
- queries.insert(
- "get_post_503_ignore_body",
- "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'POST' AND response_status = 503",
- );
- for (name, sql) in &queries {
- println!("Executing '{}'", name);
- for i in 0..iterations {
- let start = Instant::now();
- let rows = execute_sql(ctx, sql, debug).await?;
- println!(
- "Query '{}' iteration {} returned {} rows in {} ms",
- name,
- i,
- rows,
- start.elapsed().as_millis()
- );
+ let scan_options_matrix = vec![
+ ParquetScanOptions::default(),
+ ParquetScanOptions::default()
+ .with_page_index(true)
+ .with_pushdown_filters(true)
+ .with_reorder_predicates(true),
+ ParquetScanOptions::default()
+ .with_page_index(true)
+ .with_pushdown_filters(true)
+ .with_reorder_predicates(false),
+ ];
+
+ let filter_matrix = vec![
+ // Selective-ish filter
+ col("request_method").eq(lit("GET")),
+ // Non-selective filter
+ col("request_method").not_eq(lit("GET")),
+ // Basic conjunction
+ col("request_method")
+ .eq(lit("POST"))
+ .and(col("response_status").eq(lit(503_u16))),
+ // Nested filters
+ col("request_method").eq(lit("POST")).and(or(
+ col("response_status").eq(lit(503_u16)),
+ col("response_status").eq(lit(403_u16)),
+ )),
+ // Many filters
+ combine_filters(&[
+ col("request_method").not_eq(lit("GET")),
+ col("response_status").eq(lit(400_u16)),
+ col("service").eq(lit("backend")),
+ ])
+ .unwrap(),
+ // Filter everything
+ col("response_status").eq(lit(429_u16)),
+ // Filter nothing
+ col("response_status").gt(lit(0_u16)),
+ ];
+
+ for filter_expr in &filter_matrix {
+ println!("Executing with filter '{}'", filter_expr);
+ for scan_options in &scan_options_matrix {
+ println!("Using scan options {:?}", scan_options);
+ for i in 0..iterations {
+ let start = Instant::now();
+ let rows = exec_scan(
+ ctx,
+ object_store_url.clone(),
+ object_meta.clone(),
+ filter_expr.clone(),
+ scan_options.clone(),
+ debug,
+ )
+ .await?;
+ println!(
+ "Iteration {} returned {} rows in {} ms",
+ i,
+ rows,
+ start.elapsed().as_millis()
+ );
+ }
}
+ println!("\n");
}
Ok(())
}
-async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<usize> {
- let plan = ctx.create_logical_plan(sql)?;
- let plan = ctx.optimize(&plan)?;
- if debug {
- println!("Optimized logical plan:\n{:?}", plan);
- }
- let physical_plan = ctx.create_physical_plan(&plan).await?;
+async fn exec_scan(
+ ctx: &SessionContext,
+ object_store_url: ObjectStoreUrl,
+ object_meta: ObjectMeta,
+ filter: Expr,
+ scan_options: ParquetScanOptions,
+ debug: bool,
+) -> Result<usize> {
+ let scan_config = FileScanConfig {
+ object_store_url,
+ file_schema: BatchBuilder::schema(),
+ file_groups: vec![vec![PartitionedFile {
+ object_meta,
+ partition_values: vec![],
+ range: None,
+ extensions: None,
+ }]],
+ statistics: Default::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ };
+ let exec = Arc::new(
+ ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options),
+ );
+
let task_ctx = ctx.task_ctx();
- let result = collect(physical_plan, task_ctx).await?;
+ let result = collect(exec, task_ctx).await?;
if debug {
pretty::print_batches(&result)?;
@@ -132,15 +193,15 @@ async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<usi
Ok(result.iter().map(|b| b.num_rows()).sum())
}
-fn gen_data(path: &PathBuf, scale_factor: f32) {
+fn gen_data(path: PathBuf, scale_factor: f32) -> Result<(ObjectStoreUrl, ObjectMeta)> {
let generator = Generator::new();
- let file = File::create(path).unwrap();
+ let file = File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, generator.schema.clone(), None).unwrap();
let mut num_rows = 0;
- let num_batches = 12_f32 * scale_factor;
+ let num_batches = 100_f32 * scale_factor;
for batch in generator.take(num_batches as usize) {
writer.write(&batch).unwrap();
@@ -149,6 +210,22 @@ fn gen_data(path: &PathBuf, scale_factor: f32) {
writer.close().unwrap();
println!("Generated test dataset with {} rows", num_rows);
+
+ let size = std::fs::metadata(&path)?.len() as usize;
+
+ let canonical_path = path.canonicalize()?;
+
+ let object_store_url =
+ ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
+ .object_store();
+
+ let object_meta = ObjectMeta {
+ location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
+ last_modified: Default::default(),
+ size,
+ };
+
+ Ok((object_store_url, object_meta))
}
#[derive(Default)]
@@ -167,7 +244,6 @@ struct BatchBuilder {
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
- response_body: StringBuilder,
}
impl BatchBuilder {
@@ -194,8 +270,6 @@ impl BatchBuilder {
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
- // This column will contain large values relative to the others
- Field::new("response_body", DataType::Utf8, false),
]))
}
@@ -261,8 +335,6 @@ impl BatchBuilder {
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
- self.response_body
- .append_value(random_string(rng, 200..2000))
}
fn finish(mut self, schema: SchemaRef) -> RecordBatch {
@@ -283,7 +355,6 @@ impl BatchBuilder {
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
- Arc::new(self.response_body.finish()),
],
)
.unwrap()