You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/13 20:57:15 UTC

[arrow-datafusion] branch master updated: Add h2o bench groupby queries (#2881)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dc9dea30 Add h2o bench groupby queries (#2881)
6dc9dea30 is described below

commit 6dc9dea30e496ad1c9880913abd73a90b6070fb2
Author: Andy Grove <ag...@apache.org>
AuthorDate: Wed Jul 13 14:57:09 2022 -0600

    Add h2o bench groupby queries (#2881)
---
 benchmarks/README.md      |  25 ++++++++++
 benchmarks/src/bin/h2o.rs | 123 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index 3d4aedd4d..e2a8f9ec0 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -96,5 +96,30 @@ Query 'fare_amt_by_passenger' iteration 1 took 7599 ms
 Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
 ```
 
+## h2o benchmarks
+
+```bash
+cargo run --release --bin h2o group-by --query 1 --path /mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv --mem-table --debug
+```
+
+Example run:
+
+```
+Running benchmarks with the following options: GroupBy(GroupBy { query: 1, path: "/mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv", debug: false })
+Executing select id1, sum(v1) as v1 from x group by id1
++-------+--------+
+| id1   | v1     |
++-------+--------+
+| id063 | 199420 |
+| id094 | 200127 |
+| id044 | 198886 |
+...
+| id093 | 200132 |
+| id003 | 199047 |
++-------+--------+
+
+h2o groupby query 1 took 1669 ms
+```
+
 [1]: http://www.tpc.org/tpch/
 [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs
new file mode 100644
index 000000000..88f4084e8
--- /dev/null
+++ b/benchmarks/src/bin/h2o.rs
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFusion h2o benchmarks
+
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::datasource::file_format::csv::CsvFormat;
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::MemTable;
+use datafusion::prelude::{CsvReadOptions, SessionConfig};
+use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
+use std::path::PathBuf;
+use std::sync::Arc;
+use structopt::StructOpt;
+use tokio::time::Instant;
+
+#[derive(Debug, StructOpt)]
+#[structopt(name = "datafusion-h2o", about = "DataFusion h2o benchmarks")]
+enum Opt {
+    GroupBy(GroupBy), //TODO add Join queries
+}
+
+#[derive(Debug, StructOpt)]
+struct GroupBy {
+    /// Query number
+    #[structopt(short, long)]
+    query: usize,
+    /// Path to data file
+    #[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
+    path: PathBuf,
+    /// Activate debug mode to see query results
+    #[structopt(short, long)]
+    debug: bool,
+    /// Load the data into a MemTable before executing the query
+    #[structopt(short = "m", long = "mem-table")]
+    mem_table: bool,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let opt = Opt::from_args();
+    println!("Running benchmarks with the following options: {:?}", opt);
+    match opt {
+        Opt::GroupBy(config) => group_by(&config).await,
+    }
+}
+
+async fn group_by(opt: &GroupBy) -> Result<()> {
+    let path = opt.path.to_str().unwrap();
+    let config = SessionConfig::from_env().with_batch_size(65535);
+
+    let ctx = SessionContext::with_config(config);
+
+    let schema = Schema::new(vec![
+        Field::new("id1", DataType::Utf8, false),
+        Field::new("id2", DataType::Utf8, false),
+        Field::new("id3", DataType::Utf8, false),
+        Field::new("id4", DataType::Int32, false),
+        Field::new("id5", DataType::Int32, false),
+        Field::new("id6", DataType::Int32, false),
+        Field::new("v1", DataType::Int32, false),
+        Field::new("v2", DataType::Int32, false),
+        Field::new("v3", DataType::Float64, false),
+    ]);
+
+    if opt.mem_table {
+        let listing_config = ListingTableConfig::new(ListingTableUrl::parse(path)?)
+            .with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
+            .with_schema(Arc::new(schema));
+        let csv = ListingTable::try_new(listing_config)?;
+        let partition_size = num_cpus::get();
+        let memtable =
+            MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
+        ctx.register_table("x", Arc::new(memtable))?;
+    } else {
+        ctx.register_csv("x", path, CsvReadOptions::default().schema(&schema))
+            .await?;
+    }
+
+    let sql = match opt.query {
+        1 => "select id1, sum(v1) as v1 from x group by id1",
+        2 => "select id1, id2, sum(v1) as v1 from x group by id1, id2",
+        3 => "select id3, sum(v1) as v1, mean(v3) as v3 from x group by id3",
+        4 => "select id4, mean(v1) as v1, mean(v2) as v2, mean(v3) as v3 from x group by id4",
+        5 => "select id6, sum(v1) as v1, sum(v2) as v2, sum(v3) as v3 from x group by id6",
+        6 => "select id4, id5, median(v3) as median_v3, stddev(v3) as sd_v3 from x group by id4, id5",
+        7 => "select id3, max(v1)-min(v2) as range_v1_v2 from x group by id3",
+        8 => "select id6, largest2_v3 from (select id6, v3 as largest2_v3, row_number() over (partition by id6 order by v3 desc) as order_v3 from x where v3 is not null) sub_query where order_v3 <= 2",
+        9 => "select id2, id4, pow(corr(v1, v2), 2) as r2 from x group by id2, id4",
+        10 => "select id1, id2, id3, id4, id5, id6, sum(v3) as v3, count(*) as count from x group by id1, id2, id3, id4, id5, id6",
+        _ => unimplemented!(),
+    };
+
+    println!("Executing {}", sql);
+    let start = Instant::now();
+    let df = ctx.sql(sql).await?;
+    let batches = df.collect().await?;
+    let elapsed = start.elapsed().as_millis();
+
+    if opt.debug {
+        pretty::print_batches(&batches)?;
+    }
+
+    println!("h2o groupby query {} took {} ms", opt.query, elapsed);
+
+    Ok(())
+}