You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2022/10/08 13:39:03 UTC

[arrow-datafusion] branch row-filter-benchmarks updated: Rework to test everything at once

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch row-filter-benchmarks
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/row-filter-benchmarks by this push:
     new 66c10763e Rework to test everything at once
66c10763e is described below

commit 66c10763e64798afbc28250bd9359a41abf8b2f2
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Sat Oct 8 09:38:53 2022 -0400

    Rework to test everything at once
---
 benchmarks/Cargo.toml                              |   1 +
 benchmarks/README.md                               |  40 ++---
 .../{access_log.rs => parquet_filter_pushdown.rs}  | 183 ++++++++++++++-------
 3 files changed, 148 insertions(+), 76 deletions(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 868475b6f..c07344508 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -38,6 +38,7 @@ env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
+object_store = "0.5.0"
 parquet = "24.0.0"
 rand = "0.8.4"
 serde = { version = "1.0.136", features = ["derive"] }
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 1a08ac358..7cb8fedaf 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -127,36 +127,36 @@ h2o groupby query 1 took 1669 ms
 [1]: http://www.tpc.org/tpch/
 [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
 
-## Access Logs benchmarks
+## Parquet filter pushdown benchmarks
 
-This is a set of benchmarks for testing and verifying performance of parquet row filter pushdown. The queries are executed on
+This is a set of benchmarks for testing and verifying performance of parquet filter pushdown. The queries are executed on
 a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs. 
 
 ```base
-cargo run --release --bin access_logs --query --path ./data --scale-factor 1.0
+cargo run --release --bin parquet_filter_pushdown --query --path ./data --scale-factor 1.0
 ```
 
 This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor`
 (with the default value of `1.0` generating a ~1GB parquet file). 
 
+For each filter we will run the query using different `ParquetScanOption` settings. 
+
 Example run:
 ```
 Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 }
-Generated test dataset with 1266707 rows
-Executing 'get_requests'
-Query 'get_requests' iteration 0 returned 211247 rows in 677 ms
-Query 'get_requests' iteration 1 returned 211247 rows in 661 ms
-Query 'get_requests' iteration 2 returned 211247 rows in 699 ms
-Executing 'get_requests_ignore_body'
-Query 'get_requests_ignore_body' iteration 0 returned 211247 rows in 155 ms
-Query 'get_requests_ignore_body' iteration 1 returned 211247 rows in 159 ms
-Query 'get_requests_ignore_body' iteration 2 returned 211247 rows in 153 ms
-Executing 'get_post_503'
-Query 'get_post_503' iteration 0 returned 42350 rows in 650 ms
-Query 'get_post_503' iteration 1 returned 42350 rows in 659 ms
-Query 'get_post_503' iteration 2 returned 42350 rows in 706 ms
-Executing 'get_post_503_ignore_body'
-Query 'get_post_503_ignore_body' iteration 0 returned 42350 rows in 155 ms
-Query 'get_post_503_ignore_body' iteration 1 returned 42350 rows in 151 ms
-Query 'get_post_503_ignore_body' iteration 2 returned 42350 rows in 157 ms
+Generated test dataset with 10699521 rows
+Executing with filter 'request_method = Utf8("GET")'
+Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false }
+Iteration 0 returned 10699521 rows in 1303 ms
+Iteration 1 returned 10699521 rows in 1288 ms
+Iteration 2 returned 10699521 rows in 1266 ms
+Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: true, enable_page_index: true }
+Iteration 0 returned 1781686 rows in 1970 ms
+Iteration 1 returned 1781686 rows in 2002 ms
+Iteration 2 returned 1781686 rows in 1988 ms
+Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: false, enable_page_index: true }
+Iteration 0 returned 1781686 rows in 1940 ms
+Iteration 1 returned 1781686 rows in 1986 ms
+Iteration 2 returned 1781686 rows in 1947 ms
+...
 ```
\ No newline at end of file
diff --git a/benchmarks/src/bin/access_log.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
similarity index 68%
rename from benchmarks/src/bin/access_log.rs
rename to benchmarks/src/bin/parquet_filter_pushdown.rs
index 2867c0365..5bf7c11ef 100644
--- a/benchmarks/src/bin/access_log.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -6,12 +6,19 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty;
 use datafusion::common::Result;
+use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::{lit, or, Expr};
 use datafusion::physical_plan::collect;
-use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
+use datafusion::physical_plan::file_format::{
+    FileScanConfig, ParquetExec, ParquetScanOptions,
+};
+use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext};
+use object_store::path::Path;
+use object_store::ObjectMeta;
 use parquet::arrow::ArrowWriter;
 use rand::rngs::StdRng;
 use rand::{Rng, SeedableRng};
-use std::collections::HashMap;
 use std::fs::File;
 use std::ops::Range;
 use std::path::PathBuf;
@@ -63,68 +70,122 @@ async fn main() -> Result<()> {
 
     let path = opt.path.join("logs.parquet");
 
-    gen_data(&path, opt.scale_factor);
+    let (object_store_url, object_meta) = gen_data(path, opt.scale_factor)?;
 
-    ctx.register_parquet(
-        "logs",
-        path.to_str().unwrap(),
-        ParquetReadOptions::default(),
+    run_benchmarks(
+        &mut ctx,
+        object_store_url.clone(),
+        object_meta.clone(),
+        opt.iterations,
+        opt.debug,
     )
     .await?;
 
-    datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await?;
-
     Ok(())
 }
 
-async fn datafusion_sql_benchmarks(
+async fn run_benchmarks(
     ctx: &mut SessionContext,
+    object_store_url: ObjectStoreUrl,
+    object_meta: ObjectMeta,
     iterations: usize,
     debug: bool,
 ) -> Result<()> {
-    let mut queries = HashMap::new();
-    queries.insert(
-        "get_requests",
-        "SELECT * FROM logs WHERE request_method = 'GET'",
-    );
-    queries.insert(
-        "get_requests_ignore_body",
-        "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'GET'"
-    );
-    queries.insert(
-        "get_post_503",
-        "SELECT * FROM logs WHERE request_method = 'POST' AND response_status = 503",
-    );
-    queries.insert(
-        "get_post_503_ignore_body",
-        "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'POST' AND response_status = 503",
-    );
-    for (name, sql) in &queries {
-        println!("Executing '{}'", name);
-        for i in 0..iterations {
-            let start = Instant::now();
-            let rows = execute_sql(ctx, sql, debug).await?;
-            println!(
-                "Query '{}' iteration {} returned {} rows in {} ms",
-                name,
-                i,
-                rows,
-                start.elapsed().as_millis()
-            );
+    let scan_options_matrix = vec![
+        ParquetScanOptions::default(),
+        ParquetScanOptions::default()
+            .with_page_index(true)
+            .with_pushdown_filters(true)
+            .with_reorder_predicates(true),
+        ParquetScanOptions::default()
+            .with_page_index(true)
+            .with_pushdown_filters(true)
+            .with_reorder_predicates(false),
+    ];
+
+    let filter_matrix = vec![
+        // Selective-ish filter
+        col("request_method").eq(lit("GET")),
+        // Non-selective filter
+        col("request_method").not_eq(lit("GET")),
+        // Basic conjunction
+        col("request_method")
+            .eq(lit("POST"))
+            .and(col("response_status").eq(lit(503_u16))),
+        // Nested filters
+        col("request_method").eq(lit("POST")).and(or(
+            col("response_status").eq(lit(503_u16)),
+            col("response_status").eq(lit(403_u16)),
+        )),
+        // Many filters
+        combine_filters(&[
+            col("request_method").not_eq(lit("GET")),
+            col("response_status").eq(lit(400_u16)),
+            col("service").eq(lit("backend")),
+        ])
+        .unwrap(),
+        // Filter everything
+        col("response_status").eq(lit(429_u16)),
+        // Filter nothing
+        col("response_status").gt(lit(0_u16)),
+    ];
+
+    for filter_expr in &filter_matrix {
+        println!("Executing with filter '{}'", filter_expr);
+        for scan_options in &scan_options_matrix {
+            println!("Using scan options {:?}", scan_options);
+            for i in 0..iterations {
+                let start = Instant::now();
+                let rows = exec_scan(
+                    ctx,
+                    object_store_url.clone(),
+                    object_meta.clone(),
+                    filter_expr.clone(),
+                    scan_options.clone(),
+                    debug,
+                )
+                .await?;
+                println!(
+                    "Iteration {} returned {} rows in {} ms",
+                    i,
+                    rows,
+                    start.elapsed().as_millis()
+                );
+            }
         }
+        println!("\n");
     }
     Ok(())
 }
 
-async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<usize> {
-    let plan = ctx.create_logical_plan(sql)?;
-    let plan = ctx.optimize(&plan)?;
-    if debug {
-        println!("Optimized logical plan:\n{:?}", plan);
-    }
-    let physical_plan = ctx.create_physical_plan(&plan).await?;
+async fn exec_scan(
+    ctx: &SessionContext,
+    object_store_url: ObjectStoreUrl,
+    object_meta: ObjectMeta,
+    filter: Expr,
+    scan_options: ParquetScanOptions,
+    debug: bool,
+) -> Result<usize> {
+    let scan_config = FileScanConfig {
+        object_store_url,
+        file_schema: BatchBuilder::schema(),
+        file_groups: vec![vec![PartitionedFile {
+            object_meta,
+            partition_values: vec![],
+            range: None,
+            extensions: None,
+        }]],
+        statistics: Default::default(),
+        projection: None,
+        limit: None,
+        table_partition_cols: vec![],
+    };
+    let exec = Arc::new(
+        ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options),
+    );
+
     let task_ctx = ctx.task_ctx();
-    let result = collect(physical_plan, task_ctx).await?;
+    let result = collect(exec, task_ctx).await?;
 
     if debug {
         pretty::print_batches(&result)?;
@@ -132,15 +193,15 @@ async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<usi
     Ok(result.iter().map(|b| b.num_rows()).sum())
 }
 
-fn gen_data(path: &PathBuf, scale_factor: f32) {
+fn gen_data(path: PathBuf, scale_factor: f32) -> Result<(ObjectStoreUrl, ObjectMeta)> {
     let generator = Generator::new();
 
-    let file = File::create(path).unwrap();
+    let file = File::create(&path).unwrap();
     let mut writer = ArrowWriter::try_new(file, generator.schema.clone(), None).unwrap();
 
     let mut num_rows = 0;
 
-    let num_batches = 12_f32 * scale_factor;
+    let num_batches = 100_f32 * scale_factor;
 
     for batch in generator.take(num_batches as usize) {
         writer.write(&batch).unwrap();
@@ -149,6 +210,22 @@ fn gen_data(path: &PathBuf, scale_factor: f32) {
     writer.close().unwrap();
 
     println!("Generated test dataset with {} rows", num_rows);
+
+    let size = std::fs::metadata(&path)?.len() as usize;
+
+    let canonical_path = path.canonicalize()?;
+
+    let object_store_url =
+        ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
+            .object_store();
+
+    let object_meta = ObjectMeta {
+        location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
+        last_modified: Default::default(),
+        size,
+    };
+
+    Ok((object_store_url, object_meta))
 }
 
 #[derive(Default)]
@@ -167,7 +244,6 @@ struct BatchBuilder {
     request_bytes: Int32Builder,
     response_bytes: Int32Builder,
     response_status: UInt16Builder,
-    response_body: StringBuilder,
 }
 
 impl BatchBuilder {
@@ -194,8 +270,6 @@ impl BatchBuilder {
             Field::new("request_bytes", DataType::Int32, true),
             Field::new("response_bytes", DataType::Int32, true),
             Field::new("response_status", DataType::UInt16, false),
-            // This column will contain large values relative to the others
-            Field::new("response_body", DataType::Utf8, false),
         ]))
     }
 
@@ -261,8 +335,6 @@ impl BatchBuilder {
             .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
         self.response_status
             .append_value(status[rng.gen_range(0..status.len())]);
-        self.response_body
-            .append_value(random_string(rng, 200..2000))
     }
 
     fn finish(mut self, schema: SchemaRef) -> RecordBatch {
@@ -283,7 +355,6 @@ impl BatchBuilder {
                 Arc::new(self.request_bytes.finish()),
                 Arc::new(self.response_bytes.finish()),
                 Arc::new(self.response_status.finish()),
-                Arc::new(self.response_body.finish()),
             ],
         )
         .unwrap()