You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2022/10/08 11:57:34 UTC

[arrow-datafusion] 01/01: Add benchmarks for testing row filtering

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch row-filter-benchmarks
in repository

commit 95afefafd6172c0579daf6b8b660b8b6966e1315
Author: Dan Harris <>
AuthorDate: Sat Oct 8 07:57:14 2022 -0400

    Add benchmarks for testing row filtering
 benchmarks/Cargo.toml            |   2 +
 benchmarks/             |  34 ++++
 benchmarks/src/bin/ | 356 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 392 insertions(+)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 7367e9682..868475b6f 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -32,11 +32,13 @@ simd = ["datafusion/simd"]
 snmalloc = ["snmalloc-rs"]
+arrow = "24.0.0"
 datafusion = { path = "../datafusion/core" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
+parquet = "24.0.0"
 rand = "0.8.4"
 serde = { version = "1.0.136", features = ["derive"] }
 serde_json = "1.0.78"
diff --git a/benchmarks/ b/benchmarks/
index 505469fc5..1a08ac358 100644
--- a/benchmarks/
+++ b/benchmarks/
@@ -126,3 +126,37 @@ h2o groupby query 1 took 1669 ms
+## Access Logs benchmarks
+This is a set of benchmarks for testing and verifying performance of parquet row filter pushdown. The queries are executed on
+a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs. 
+cargo run --release --bin access_logs --query --path ./data --scale-factor 1.0
+This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor`
+(with the default value of `1.0` generating a ~1GB parquet file). 
+Example run:
+Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 }
+Generated test dataset with 1266707 rows
+Executing 'get_requests'
+Query 'get_requests' iteration 0 returned 211247 rows in 677 ms
+Query 'get_requests' iteration 1 returned 211247 rows in 661 ms
+Query 'get_requests' iteration 2 returned 211247 rows in 699 ms
+Executing 'get_requests_ignore_body'
+Query 'get_requests_ignore_body' iteration 0 returned 211247 rows in 155 ms
+Query 'get_requests_ignore_body' iteration 1 returned 211247 rows in 159 ms
+Query 'get_requests_ignore_body' iteration 2 returned 211247 rows in 153 ms
+Executing 'get_post_503'
+Query 'get_post_503' iteration 0 returned 42350 rows in 650 ms
+Query 'get_post_503' iteration 1 returned 42350 rows in 659 ms
+Query 'get_post_503' iteration 2 returned 42350 rows in 706 ms
+Executing 'get_post_503_ignore_body'
+Query 'get_post_503_ignore_body' iteration 0 returned 42350 rows in 155 ms
+Query 'get_post_503_ignore_body' iteration 1 returned 42350 rows in 151 ms
+Query 'get_post_503_ignore_body' iteration 2 returned 42350 rows in 157 ms
\ No newline at end of file
diff --git a/benchmarks/src/bin/ b/benchmarks/src/bin/
new file mode 100644
index 000000000..2867c0365
--- /dev/null
+++ b/benchmarks/src/bin/
@@ -0,0 +1,356 @@
+use arrow::array::{
+    Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
+    UInt16Builder,
+use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty;
+use datafusion::common::Result;
+use datafusion::physical_plan::collect;
+use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
+use parquet::arrow::ArrowWriter;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use std::collections::HashMap;
+use std::fs::File;
+use std::ops::Range;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+use structopt::StructOpt;
+#[cfg(feature = "snmalloc")]
+static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+#[derive(Debug, StructOpt)]
+#[structopt(name = "Benchmarks", about = "Apache Arrow Rust Benchmarks.")]
+struct Opt {
+    /// Activate debug mode to see query results
+    #[structopt(short, long)]
+    debug: bool,
+    /// Number of iterations of each test run
+    #[structopt(short = "i", long = "iterations", default_value = "3")]
+    iterations: usize,
+    /// Number of partitions to process in parallel
+    #[structopt(long = "partitions", default_value = "2")]
+    partitions: usize,
+    /// Path to folder where access log file will be generated
+    #[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
+    path: PathBuf,
+    /// Batch size when reading Parquet files
+    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+    batch_size: usize,
+    /// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 1GB parquet file
+    #[structopt(short = "s", long = "scale-factor", default_value = "1.0")]
+    scale_factor: f32,
+async fn main() -> Result<()> {
+    let opt: Opt = Opt::from_args();
+    println!("Running benchmarks with the following options: {:?}", opt);
+    let config = SessionConfig::new()
+        .with_target_partitions(opt.partitions)
+        .with_batch_size(opt.batch_size);
+    let mut ctx = SessionContext::with_config(config);
+    let path = opt.path.join("logs.parquet");
+    gen_data(&path, opt.scale_factor);
+    ctx.register_parquet(
+        "logs",
+        path.to_str().unwrap(),
+        ParquetReadOptions::default(),
+    )
+    .await?;
+    datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await?;
+    Ok(())
+async fn datafusion_sql_benchmarks(
+    ctx: &mut SessionContext,
+    iterations: usize,
+    debug: bool,
+) -> Result<()> {
+    let mut queries = HashMap::new();
+    queries.insert(
+        "get_requests",
+        "SELECT * FROM logs WHERE request_method = 'GET'",
+    );
+    queries.insert(
+        "get_requests_ignore_body",
+        "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'GET'"
+    );
+    queries.insert(
+        "get_post_503",
+        "SELECT * FROM logs WHERE request_method = 'POST' AND response_status = 503",
+    );
+    queries.insert(
+        "get_post_503_ignore_body",
+        "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'POST' AND response_status = 503",
+    );
+    for (name, sql) in &queries {
+        println!("Executing '{}'", name);
+        for i in 0..iterations {
+            let start = Instant::now();
+            let rows = execute_sql(ctx, sql, debug).await?;
+            println!(
+                "Query '{}' iteration {} returned {} rows in {} ms",
+                name,
+                i,
+                rows,
+                start.elapsed().as_millis()
+            );
+        }
+    }
+    Ok(())
+async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<usize> {
+    let plan = ctx.create_logical_plan(sql)?;
+    let plan = ctx.optimize(&plan)?;
+    if debug {
+        println!("Optimized logical plan:\n{:?}", plan);
+    }
+    let physical_plan = ctx.create_physical_plan(&plan).await?;
+    let task_ctx = ctx.task_ctx();
+    let result = collect(physical_plan, task_ctx).await?;
+    if debug {
+        pretty::print_batches(&result)?;
+    }
+    Ok(result.iter().map(|b| b.num_rows()).sum())
+fn gen_data(path: &PathBuf, scale_factor: f32) {
+    let generator = Generator::new();
+    let file = File::create(path).unwrap();
+    let mut writer = ArrowWriter::try_new(file, generator.schema.clone(), None).unwrap();
+    let mut num_rows = 0;
+    let num_batches = 12_f32 * scale_factor;
+    for batch in generator.take(num_batches as usize) {
+        writer.write(&batch).unwrap();
+        num_rows += batch.num_rows();
+    }
+    writer.close().unwrap();
+    println!("Generated test dataset with {} rows", num_rows);
+struct BatchBuilder {
+    service: StringDictionaryBuilder<Int32Type>,
+    host: StringDictionaryBuilder<Int32Type>,
+    pod: StringDictionaryBuilder<Int32Type>,
+    container: StringDictionaryBuilder<Int32Type>,
+    image: StringDictionaryBuilder<Int32Type>,
+    time: TimestampNanosecondBuilder,
+    client_addr: StringBuilder,
+    request_duration: Int32Builder,
+    request_user_agent: StringBuilder,
+    request_method: StringBuilder,
+    request_host: StringBuilder,
+    request_bytes: Int32Builder,
+    response_bytes: Int32Builder,
+    response_status: UInt16Builder,
+    response_body: StringBuilder,
+impl BatchBuilder {
+    fn schema() -> SchemaRef {
+        let utf8_dict =
+            || DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+        Arc::new(Schema::new(vec![
+            Field::new("service", utf8_dict(), true),
+            Field::new("host", utf8_dict(), false),
+            Field::new("pod", utf8_dict(), false),
+            Field::new("container", utf8_dict(), false),
+            Field::new("image", utf8_dict(), false),
+            Field::new(
+                "time",
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                false,
+            ),
+            Field::new("client_addr", DataType::Utf8, true),
+            Field::new("request_duration_ns", DataType::Int32, false),
+            Field::new("request_user_agent", DataType::Utf8, true),
+            Field::new("request_method", DataType::Utf8, true),
+            Field::new("request_host", DataType::Utf8, true),
+            Field::new("request_bytes", DataType::Int32, true),
+            Field::new("response_bytes", DataType::Int32, true),
+            Field::new("response_status", DataType::UInt16, false),
+            // This column will contain large values relative to the others
+            Field::new("response_body", DataType::Utf8, false),
+        ]))
+    }
+    fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
+        let num_pods = rng.gen_range(1..15);
+        let pods = generate_sorted_strings(rng, num_pods, 30..40);
+        for pod in pods {
+            for container_idx in 0..rng.gen_range(1..3) {
+                let container = format!("{}_container_{}", service, container_idx);
+                let image = format!(
+                    "{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9",
+                    container
+                );
+                let num_entries = rng.gen_range(1024..8192);
+                for i in 0..num_entries {
+                    let time = i as i64 * 1024;
+                    self.append_row(rng, host, &pod, service, &container, &image, time);
+                }
+            }
+        }
+    }
+    #[allow(clippy::too_many_arguments)]
+    fn append_row(
+        &mut self,
+        rng: &mut StdRng,
+        host: &str,
+        pod: &str,
+        service: &str,
+        container: &str,
+        image: &str,
+        time: i64,
+    ) {
+        let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
+        let status = &[200, 204, 400, 503, 403];
+        self.service.append(service).unwrap();
+        self.pod.append(pod).unwrap();
+        self.container.append(container).unwrap();
+        self.image.append(image).unwrap();
+        self.time.append_value(time);
+        self.client_addr.append_value(format!(
+            "{}.{}.{}.{}",
+            rng.gen::<u8>(),
+            rng.gen::<u8>(),
+            rng.gen::<u8>(),
+            rng.gen::<u8>()
+        ));
+        self.request_duration.append_value(rng.gen());
+        self.request_user_agent
+            .append_value(random_string(rng, 20..100));
+        self.request_method
+            .append_value(methods[rng.gen_range(0..methods.len())]);
+        self.request_host
+            .append_value(format!("https://{}", service));
+        self.request_bytes
+            .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
+        self.response_bytes
+            .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
+        self.response_status
+            .append_value(status[rng.gen_range(0..status.len())]);
+        self.response_body
+            .append_value(random_string(rng, 200..2000))
+    }
+    fn finish(mut self, schema: SchemaRef) -> RecordBatch {
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(self.service.finish()),
+                Arc::new(,
+                Arc::new(self.pod.finish()),
+                Arc::new(self.container.finish()),
+                Arc::new(self.image.finish()),
+                Arc::new(self.time.finish()),
+                Arc::new(self.client_addr.finish()),
+                Arc::new(self.request_duration.finish()),
+                Arc::new(self.request_user_agent.finish()),
+                Arc::new(self.request_method.finish()),
+                Arc::new(self.request_host.finish()),
+                Arc::new(self.request_bytes.finish()),
+                Arc::new(self.response_bytes.finish()),
+                Arc::new(self.response_status.finish()),
+                Arc::new(self.response_body.finish()),
+            ],
+        )
+        .unwrap()
+    }
+fn random_string(rng: &mut StdRng, len_range: Range<usize>) -> String {
+    let len = rng.gen_range(len_range);
+    (0..len)
+        .map(|_| rng.gen_range(b'a'..=b'z') as char)
+        .collect::<String>()
+fn generate_sorted_strings(
+    rng: &mut StdRng,
+    count: usize,
+    str_len: Range<usize>,
+) -> Vec<String> {
+    let mut strings: Vec<_> = (0..count)
+        .map(|_| random_string(rng, str_len.clone()))
+        .collect();
+    strings.sort_unstable();
+    strings
+/// Generates sorted RecordBatch with an access log style schema for a single host
+struct Generator {
+    schema: SchemaRef,
+    rng: StdRng,
+    host_idx: usize,
+impl Generator {
+    fn new() -> Self {
+        let seed = [
+            1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0,
+            0, 0, 0, 5, 0, 0, 0, 0, 0,
+        ];
+        Self {
+            schema: BatchBuilder::schema(),
+            host_idx: 0,
+            rng: StdRng::from_seed(seed),
+        }
+    }
+impl Iterator for Generator {
+    type Item = RecordBatch;
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut builder = BatchBuilder::default();
+        let host = format!(
+            "i-{:016x}.ec2.internal",
+            self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928
+        );
+        self.host_idx += 1;
+        for service in &["frontend", "backend", "database", "cache"] {
+            if self.rng.gen_bool(0.5) {
+                continue;
+            }
+            builder.append(&mut self.rng, &host, service);
+        }
+        Some(builder.finish(Arc::clone(&self.schema)))
+    }