You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/03 15:42:29 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request #1738: Add parquet SQL benchmarks

tustvold opened a new pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738


   # Which issue does this PR close?
   
   Closes #TBD.
   
    # Rationale for this change
   
   Benchmarks good, more benchmarks more good :smile: 
   
   # What changes are included in this PR?
   
   This adds a benchmark that optionally generates a large-ish parquet file, or uses a file specified by an environment variable, and then runs through a list of queries against this file.
   
   My hope is that this will supplement the TPCH benchmark, with one that is perhaps easier for people to setup and run, and that can be more easily adapted to test different data shapes and queries.
   
   In particular as currently configured this will test:
   
   * Dictionary arrays
   * Nullable arrays
   * Large-ish parquet files (~200Mb)
   * Basic table scans with filters and aggregates
   * ...Suggestions welcome :smile: 
   
   It could theoretically be extended to incorporate joins, however, as I don't currently have a real-world use-case that produces these, I'd rather leave this to someone with such a workload to model a representative benchmark for.
   
   _Unfortunately the generation portion needs https://github.com/apache/arrow-rs/pull/1214 but arrow 9 should be out soon which will contain this. Will keep this as a draft until then._
   
   # Are there any user-facing changes?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040089702


   There are definitely tweaks that would be cool to make to this, e.g. testing different column encodings, but I think this is a decent starting point


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r807309526



##########
File path: datafusion/benches/parquet_query_sql.rs
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks of SQL queries again parquet data
+
+use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
+    SchemaRef,
+};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::ExecutionContext;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use rand::distributions::uniform::SampleUniform;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Range;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::Instant;
+use tempfile::NamedTempFile;
+use tokio_stream::StreamExt;
+
+/// The number of batches to write
+const NUM_BATCHES: usize = 2048;
+/// The number of rows in each record batch to write
+const WRITE_RECORD_BATCH_SIZE: usize = 1024;
+/// The number of rows in a row group
+const ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// The number of row groups expected
+const EXPECTED_ROW_GROUPS: usize = 2;
+
+fn schema() -> SchemaRef {
+    let string_dictionary_type =
+        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+
+    Arc::new(Schema::new(vec![
+        Field::new("dict_10_required", string_dictionary_type.clone(), false),
+        Field::new("dict_10_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_100_required", string_dictionary_type.clone(), false),
+        Field::new("dict_100_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_1000_required", string_dictionary_type.clone(), false),
+        Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
+        Field::new("string_required", DataType::Utf8, false),
+        Field::new("string_optional", DataType::Utf8, true),
+        Field::new("i64_required", DataType::Int64, false),
+        Field::new("i64_optional", DataType::Int64, true),
+        Field::new("f64_required", DataType::Float64, false),
+        Field::new("f64_optional", DataType::Float64, true),
+    ]))
+}
+
+fn generate_batch() -> RecordBatch {
+    let schema = schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+    RecordBatch::try_new(
+        schema,
+        vec![
+            generate_string_dictionary("prefix", 10, len, 1.0),
+            generate_string_dictionary("prefix", 10, len, 0.5),
+            generate_string_dictionary("prefix", 100, len, 1.0),
+            generate_string_dictionary("prefix", 100, len, 0.5),
+            generate_string_dictionary("prefix", 1000, len, 1.0),
+            generate_string_dictionary("prefix", 1000, len, 0.5),
+            generate_strings(0..100, len, 1.0),
+            generate_strings(0..100, len, 0.5),
+            generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
+            generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
+            generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
+            generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
+        ],
+    )
+    .unwrap()
+}
+
+fn generate_string_dictionary(
+    prefix: &str,
+    cardinality: usize,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    let strings: Vec<_> = (0..cardinality)
+        .map(|x| format!("{}#{}", prefix, x))
+        .collect();
+
+    Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
+        |_| {
+            rng.gen_bool(valid_percent)
+                .then(|| strings[rng.gen_range(0..cardinality)].as_str())
+        },
+    )))
+}
+
+fn generate_strings(
+    string_length_range: Range<usize>,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    Arc::new(StringArray::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent).then(|| {
+            let string_len = rng.gen_range(string_length_range.clone());
+            (0..string_len)
+                .map(|_| char::from(rng.sample(Alphanumeric)))
+                .collect::<String>()
+        })
+    })))
+}
+
+fn generate_primitive<T>(
+    len: usize,
+    valid_percent: f64,
+    range: Range<T::Native>,
+) -> ArrayRef
+where
+    T: ArrowPrimitiveType,
+    T::Native: SampleUniform,
+{
+    let mut rng = thread_rng();
+    Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent)
+            .then(|| rng.gen_range(range.clone()))
+    })))
+}
+
+fn generate_file() -> NamedTempFile {
+    let now = Instant::now();
+    let named_file = tempfile::Builder::new()
+        .prefix("parquet_query_sql")
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+    let schema = schema();
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_size(ROW_GROUP_SIZE)
+        .build();
+
+    let file = named_file.as_file().try_clone().unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();
+
+    for _ in 0..NUM_BATCHES {
+        let batch = generate_batch();
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    assert_eq!(
+        metadata.num_rows as usize,
+        WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
+    );
+    assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
+
+    println!(
+        "Generated parquet file in {} seconds",
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
+        Ok(file) => (file, None),
+        Err(_) => {
+            let temp_file = generate_file();
+            (temp_file.path().display().to_string(), Some(temp_file))
+        }
+    };
+
+    assert!(Path::new(&file_path).exists(), "path not found");
+    println!("Using parquet file {}", file_path);
+
+    let mut context = ExecutionContext::new();
+
+    let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
+    rt.block_on(context.register_parquet("t", file_path.as_str()))
+        .unwrap();
+
+    // We read the queries from a file so they can be changed without recompiling the benchmark
+    let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
+    let mut queries = String::new();
+    queries_file.read_to_string(&mut queries).unwrap();
+
+    for query in queries.split(';') {
+        let query = query.trim();
+
+        // Remove comment lines
+        let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect();
+        let query = query.join(" ");
+
+        // Ignore blank lines
+        if query.is_empty() {
+            continue;
+        }
+
+        let query = query.as_str();
+        c.bench_function(query, |b| {
+            b.iter(|| {
+                let mut context = context.clone();
+                rt.block_on(async move {
+                    let query = context.sql(query).await.unwrap();
+                    let mut stream = query.execute_stream().await.unwrap();
+                    while let Some(_) = criterion::black_box(stream.next().await) {}
+                })
+            });
+        });
+    }
+
+    // Clean up temporary file if any
+    std::mem::drop(temp_file);

Review comment:
       It was intended as a hint that the lifetime of `temp_file` matters, i.e. it must live to the end of the benchmark block. In the past I've accidentally refactored tests with NamedTempFile and its broken in odd ways that have boiled down to the temporary file getting cleaned up too early.
   
   I'll clarify the comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r807046905



##########
File path: datafusion/benches/parquet_query_sql.rs
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks of SQL queries again parquet data
+
+use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
+    SchemaRef,
+};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::ExecutionContext;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use rand::distributions::uniform::SampleUniform;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Range;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::Instant;
+use tempfile::NamedTempFile;
+use tokio_stream::StreamExt;
+
+/// The number of batches to write
+const NUM_BATCHES: usize = 2048;
+/// The number of rows in each record batch to write
+const WRITE_RECORD_BATCH_SIZE: usize = 1024;
+/// The number of rows in a row group
+const ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// The number of row groups expected
+const EXPECTED_ROW_GROUPS: usize = 2;
+
+fn schema() -> SchemaRef {
+    let string_dictionary_type =
+        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+
+    Arc::new(Schema::new(vec![
+        Field::new("dict_10_required", string_dictionary_type.clone(), false),
+        Field::new("dict_10_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_100_required", string_dictionary_type.clone(), false),
+        Field::new("dict_100_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_1000_required", string_dictionary_type.clone(), false),
+        Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
+        Field::new("string_required", DataType::Utf8, false),
+        Field::new("string_optional", DataType::Utf8, true),
+        Field::new("i64_required", DataType::Int64, false),
+        Field::new("i64_optional", DataType::Int64, true),
+        Field::new("f64_required", DataType::Float64, false),
+        Field::new("f64_optional", DataType::Float64, true),
+    ]))
+}
+
+fn generate_batch() -> RecordBatch {
+    let schema = schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+    RecordBatch::try_new(
+        schema,
+        vec![
+            generate_string_dictionary("prefix", 10, len, 1.0),
+            generate_string_dictionary("prefix", 10, len, 0.5),
+            generate_string_dictionary("prefix", 100, len, 1.0),
+            generate_string_dictionary("prefix", 100, len, 0.5),
+            generate_string_dictionary("prefix", 1000, len, 1.0),
+            generate_string_dictionary("prefix", 1000, len, 0.5),
+            generate_strings(0..100, len, 1.0),
+            generate_strings(0..100, len, 0.5),
+            generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
+            generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
+            generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
+            generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
+        ],
+    )
+    .unwrap()
+}
+
+fn generate_string_dictionary(
+    prefix: &str,
+    cardinality: usize,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    let strings: Vec<_> = (0..cardinality)
+        .map(|x| format!("{}#{}", prefix, x))
+        .collect();
+
+    Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
+        |_| {
+            rng.gen_bool(valid_percent)
+                .then(|| strings[rng.gen_range(0..cardinality)].as_str())
+        },
+    )))
+}
+
+fn generate_strings(
+    string_length_range: Range<usize>,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    Arc::new(StringArray::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent).then(|| {
+            let string_len = rng.gen_range(string_length_range.clone());
+            (0..string_len)
+                .map(|_| char::from(rng.sample(Alphanumeric)))
+                .collect::<String>()
+        })
+    })))
+}
+
+fn generate_primitive<T>(
+    len: usize,
+    valid_percent: f64,
+    range: Range<T::Native>,
+) -> ArrayRef
+where
+    T: ArrowPrimitiveType,
+    T::Native: SampleUniform,
+{
+    let mut rng = thread_rng();
+    Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent)
+            .then(|| rng.gen_range(range.clone()))
+    })))
+}
+
+fn generate_file() -> NamedTempFile {
+    let now = Instant::now();
+    let named_file = tempfile::Builder::new()
+        .prefix("parquet_query_sql")
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+    let schema = schema();
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_size(ROW_GROUP_SIZE)
+        .build();
+
+    let file = named_file.as_file().try_clone().unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();
+
+    for _ in 0..NUM_BATCHES {
+        let batch = generate_batch();
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    assert_eq!(
+        metadata.num_rows as usize,
+        WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
+    );
+    assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
+
+    println!(
+        "Generated parquet file in {} seconds",
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {

Review comment:
       This is a neat feature (being able to override the file being tested using the`PARQUET_FILE` environment variable.
   
   I wonder if it would be possible to add a note about this in https://github.com/apache/arrow-datafusion/blob/master/DEVELOPERS.md somewhere? Perhaps "how to run benchmarks" section?

##########
File path: datafusion/benches/parquet_query_sql.rs
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks of SQL queries again parquet data
+
+use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
+    SchemaRef,
+};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::ExecutionContext;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use rand::distributions::uniform::SampleUniform;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Range;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::Instant;
+use tempfile::NamedTempFile;
+use tokio_stream::StreamExt;
+
+/// The number of batches to write
+const NUM_BATCHES: usize = 2048;
+/// The number of rows in each record batch to write
+const WRITE_RECORD_BATCH_SIZE: usize = 1024;
+/// The number of rows in a row group
+const ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// The number of row groups expected
+const EXPECTED_ROW_GROUPS: usize = 2;
+
+fn schema() -> SchemaRef {
+    let string_dictionary_type =
+        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+
+    Arc::new(Schema::new(vec![
+        Field::new("dict_10_required", string_dictionary_type.clone(), false),
+        Field::new("dict_10_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_100_required", string_dictionary_type.clone(), false),
+        Field::new("dict_100_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_1000_required", string_dictionary_type.clone(), false),
+        Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
+        Field::new("string_required", DataType::Utf8, false),
+        Field::new("string_optional", DataType::Utf8, true),
+        Field::new("i64_required", DataType::Int64, false),
+        Field::new("i64_optional", DataType::Int64, true),
+        Field::new("f64_required", DataType::Float64, false),
+        Field::new("f64_optional", DataType::Float64, true),
+    ]))
+}
+
+fn generate_batch() -> RecordBatch {
+    let schema = schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+    RecordBatch::try_new(
+        schema,
+        vec![
+            generate_string_dictionary("prefix", 10, len, 1.0),
+            generate_string_dictionary("prefix", 10, len, 0.5),
+            generate_string_dictionary("prefix", 100, len, 1.0),
+            generate_string_dictionary("prefix", 100, len, 0.5),
+            generate_string_dictionary("prefix", 1000, len, 1.0),
+            generate_string_dictionary("prefix", 1000, len, 0.5),
+            generate_strings(0..100, len, 1.0),
+            generate_strings(0..100, len, 0.5),
+            generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
+            generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
+            generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
+            generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
+        ],
+    )
+    .unwrap()
+}
+
+fn generate_string_dictionary(
+    prefix: &str,
+    cardinality: usize,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    let strings: Vec<_> = (0..cardinality)
+        .map(|x| format!("{}#{}", prefix, x))
+        .collect();
+
+    Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
+        |_| {
+            rng.gen_bool(valid_percent)
+                .then(|| strings[rng.gen_range(0..cardinality)].as_str())
+        },
+    )))
+}
+
+fn generate_strings(
+    string_length_range: Range<usize>,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    Arc::new(StringArray::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent).then(|| {
+            let string_len = rng.gen_range(string_length_range.clone());
+            (0..string_len)
+                .map(|_| char::from(rng.sample(Alphanumeric)))
+                .collect::<String>()
+        })
+    })))
+}
+
+fn generate_primitive<T>(
+    len: usize,
+    valid_percent: f64,
+    range: Range<T::Native>,
+) -> ArrayRef
+where
+    T: ArrowPrimitiveType,
+    T::Native: SampleUniform,
+{
+    let mut rng = thread_rng();
+    Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent)
+            .then(|| rng.gen_range(range.clone()))
+    })))
+}
+
+fn generate_file() -> NamedTempFile {
+    let now = Instant::now();
+    let named_file = tempfile::Builder::new()
+        .prefix("parquet_query_sql")
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+    let schema = schema();
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_size(ROW_GROUP_SIZE)
+        .build();
+
+    let file = named_file.as_file().try_clone().unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();
+
+    for _ in 0..NUM_BATCHES {
+        let batch = generate_batch();
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    assert_eq!(
+        metadata.num_rows as usize,
+        WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
+    );
+    assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
+
+    println!(
+        "Generated parquet file in {} seconds",
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
+        Ok(file) => (file, None),
+        Err(_) => {
+            let temp_file = generate_file();
+            (temp_file.path().display().to_string(), Some(temp_file))
+        }
+    };
+
+    assert!(Path::new(&file_path).exists(), "path not found");
+    println!("Using parquet file {}", file_path);
+
+    let mut context = ExecutionContext::new();
+
+    let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
+    rt.block_on(context.register_parquet("t", file_path.as_str()))
+        .unwrap();
+
+    // We read the queries from a file so they can be changed without recompiling the benchmark
+    let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
+    let mut queries = String::new();
+    queries_file.read_to_string(&mut queries).unwrap();
+
+    for query in queries.split(';') {
+        let query = query.trim();
+
+        // Remove comment lines
+        let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect();
+        let query = query.join(" ");
+
+        // Ignore blank lines
+        if query.is_empty() {
+            continue;
+        }
+
+        let query = query.as_str();
+        c.bench_function(query, |b| {
+            b.iter(|| {
+                let mut context = context.clone();
+                rt.block_on(async move {
+                    let query = context.sql(query).await.unwrap();
+                    let mut stream = query.execute_stream().await.unwrap();
+                    while let Some(_) = criterion::black_box(stream.next().await) {}
+                })
+            });
+        });
+    }
+
+    // Clean up temporary file if any
+    std::mem::drop(temp_file);

Review comment:
       Why do we need to drop the temp file explicitly?  Won't it automatically happen when the variable goes out of scope?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1038114256


   Once https://github.com/apache/arrow-datafusion/pull/1775 merges, we can probably clean up this PR and get it merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r807040517



##########
File path: dev/release/rat_exclude_files.txt
##########
@@ -116,6 +116,7 @@ ci/*
 **/*.svg
 **/*.csv
 **/*.json
+**/*.sql

Review comment:
       This is fine in my opinion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r806670842



##########
File path: dev/release/rat_exclude_files.txt
##########
@@ -116,6 +116,7 @@ ci/*
 **/*.svg
 **/*.csv
 **/*.json
+**/*.sql

Review comment:
       I _think_ this is the correct thing to do, but someone should probably verify if RAT is needed for SQL files




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r806670842



##########
File path: dev/release/rat_exclude_files.txt
##########
@@ -116,6 +116,7 @@ ci/*
 **/*.svg
 **/*.csv
 **/*.json
+**/*.sql

Review comment:
       I _think_ this is the correct thing to do, but someone should probably verify if RAT is needed for SQL files




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r807040517



##########
File path: dev/release/rat_exclude_files.txt
##########
@@ -116,6 +116,7 @@ ci/*
 **/*.svg
 **/*.csv
 **/*.json
+**/*.sql

Review comment:
       This is fine in my opinion

##########
File path: datafusion/benches/parquet_query_sql.rs
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks of SQL queries again parquet data
+
+use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
+    SchemaRef,
+};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::ExecutionContext;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use rand::distributions::uniform::SampleUniform;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Range;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::Instant;
+use tempfile::NamedTempFile;
+use tokio_stream::StreamExt;
+
+/// The number of batches to write
+const NUM_BATCHES: usize = 2048;
+/// The number of rows in each record batch to write
+const WRITE_RECORD_BATCH_SIZE: usize = 1024;
+/// The number of rows in a row group
+const ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// The number of row groups expected
+const EXPECTED_ROW_GROUPS: usize = 2;
+
+fn schema() -> SchemaRef {
+    let string_dictionary_type =
+        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+
+    Arc::new(Schema::new(vec![
+        Field::new("dict_10_required", string_dictionary_type.clone(), false),
+        Field::new("dict_10_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_100_required", string_dictionary_type.clone(), false),
+        Field::new("dict_100_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_1000_required", string_dictionary_type.clone(), false),
+        Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
+        Field::new("string_required", DataType::Utf8, false),
+        Field::new("string_optional", DataType::Utf8, true),
+        Field::new("i64_required", DataType::Int64, false),
+        Field::new("i64_optional", DataType::Int64, true),
+        Field::new("f64_required", DataType::Float64, false),
+        Field::new("f64_optional", DataType::Float64, true),
+    ]))
+}
+
+fn generate_batch() -> RecordBatch {
+    let schema = schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+    RecordBatch::try_new(
+        schema,
+        vec![
+            generate_string_dictionary("prefix", 10, len, 1.0),
+            generate_string_dictionary("prefix", 10, len, 0.5),
+            generate_string_dictionary("prefix", 100, len, 1.0),
+            generate_string_dictionary("prefix", 100, len, 0.5),
+            generate_string_dictionary("prefix", 1000, len, 1.0),
+            generate_string_dictionary("prefix", 1000, len, 0.5),
+            generate_strings(0..100, len, 1.0),
+            generate_strings(0..100, len, 0.5),
+            generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
+            generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
+            generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
+            generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
+        ],
+    )
+    .unwrap()
+}
+
+fn generate_string_dictionary(
+    prefix: &str,
+    cardinality: usize,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    let strings: Vec<_> = (0..cardinality)
+        .map(|x| format!("{}#{}", prefix, x))
+        .collect();
+
+    Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
+        |_| {
+            rng.gen_bool(valid_percent)
+                .then(|| strings[rng.gen_range(0..cardinality)].as_str())
+        },
+    )))
+}
+
+fn generate_strings(
+    string_length_range: Range<usize>,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    Arc::new(StringArray::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent).then(|| {
+            let string_len = rng.gen_range(string_length_range.clone());
+            (0..string_len)
+                .map(|_| char::from(rng.sample(Alphanumeric)))
+                .collect::<String>()
+        })
+    })))
+}
+
+fn generate_primitive<T>(
+    len: usize,
+    valid_percent: f64,
+    range: Range<T::Native>,
+) -> ArrayRef
+where
+    T: ArrowPrimitiveType,
+    T::Native: SampleUniform,
+{
+    let mut rng = thread_rng();
+    Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent)
+            .then(|| rng.gen_range(range.clone()))
+    })))
+}
+
+fn generate_file() -> NamedTempFile {
+    let now = Instant::now();
+    let named_file = tempfile::Builder::new()
+        .prefix("parquet_query_sql")
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+    let schema = schema();
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_size(ROW_GROUP_SIZE)
+        .build();
+
+    let file = named_file.as_file().try_clone().unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();
+
+    for _ in 0..NUM_BATCHES {
+        let batch = generate_batch();
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    assert_eq!(
+        metadata.num_rows as usize,
+        WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
+    );
+    assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
+
+    println!(
+        "Generated parquet file in {} seconds",
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {

Review comment:
       This is a neat feature (being able to override the file being tested using the`PARQUET_FILE` environment variable.
   
   I wonder if it would be possible to add a note about this in https://github.com/apache/arrow-datafusion/blob/master/DEVELOPERS.md somewhere? Perhaps "how to run benchmarks" section?

##########
File path: datafusion/benches/parquet_query_sql.rs
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks of SQL queries again parquet data
+
+use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
+    SchemaRef,
+};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::ExecutionContext;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use rand::distributions::uniform::SampleUniform;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Range;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::Instant;
+use tempfile::NamedTempFile;
+use tokio_stream::StreamExt;
+
+/// The number of batches to write
+const NUM_BATCHES: usize = 2048;
+/// The number of rows in each record batch to write
+const WRITE_RECORD_BATCH_SIZE: usize = 1024;
+/// The number of rows in a row group
+const ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// The number of row groups expected
+const EXPECTED_ROW_GROUPS: usize = 2;
+
+fn schema() -> SchemaRef {
+    let string_dictionary_type =
+        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+
+    Arc::new(Schema::new(vec![
+        Field::new("dict_10_required", string_dictionary_type.clone(), false),
+        Field::new("dict_10_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_100_required", string_dictionary_type.clone(), false),
+        Field::new("dict_100_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_1000_required", string_dictionary_type.clone(), false),
+        Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
+        Field::new("string_required", DataType::Utf8, false),
+        Field::new("string_optional", DataType::Utf8, true),
+        Field::new("i64_required", DataType::Int64, false),
+        Field::new("i64_optional", DataType::Int64, true),
+        Field::new("f64_required", DataType::Float64, false),
+        Field::new("f64_optional", DataType::Float64, true),
+    ]))
+}
+
+fn generate_batch() -> RecordBatch {
+    let schema = schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+    RecordBatch::try_new(
+        schema,
+        vec![
+            generate_string_dictionary("prefix", 10, len, 1.0),
+            generate_string_dictionary("prefix", 10, len, 0.5),
+            generate_string_dictionary("prefix", 100, len, 1.0),
+            generate_string_dictionary("prefix", 100, len, 0.5),
+            generate_string_dictionary("prefix", 1000, len, 1.0),
+            generate_string_dictionary("prefix", 1000, len, 0.5),
+            generate_strings(0..100, len, 1.0),
+            generate_strings(0..100, len, 0.5),
+            generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
+            generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
+            generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
+            generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
+        ],
+    )
+    .unwrap()
+}
+
+fn generate_string_dictionary(
+    prefix: &str,
+    cardinality: usize,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    let strings: Vec<_> = (0..cardinality)
+        .map(|x| format!("{}#{}", prefix, x))
+        .collect();
+
+    Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
+        |_| {
+            rng.gen_bool(valid_percent)
+                .then(|| strings[rng.gen_range(0..cardinality)].as_str())
+        },
+    )))
+}
+
+fn generate_strings(
+    string_length_range: Range<usize>,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    Arc::new(StringArray::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent).then(|| {
+            let string_len = rng.gen_range(string_length_range.clone());
+            (0..string_len)
+                .map(|_| char::from(rng.sample(Alphanumeric)))
+                .collect::<String>()
+        })
+    })))
+}
+
+fn generate_primitive<T>(
+    len: usize,
+    valid_percent: f64,
+    range: Range<T::Native>,
+) -> ArrayRef
+where
+    T: ArrowPrimitiveType,
+    T::Native: SampleUniform,
+{
+    let mut rng = thread_rng();
+    Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent)
+            .then(|| rng.gen_range(range.clone()))
+    })))
+}
+
+fn generate_file() -> NamedTempFile {
+    let now = Instant::now();
+    let named_file = tempfile::Builder::new()
+        .prefix("parquet_query_sql")
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+    let schema = schema();
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_size(ROW_GROUP_SIZE)
+        .build();
+
+    let file = named_file.as_file().try_clone().unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();
+
+    for _ in 0..NUM_BATCHES {
+        let batch = generate_batch();
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    assert_eq!(
+        metadata.num_rows as usize,
+        WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
+    );
+    assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
+
+    println!(
+        "Generated parquet file in {} seconds",
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
+        Ok(file) => (file, None),
+        Err(_) => {
+            let temp_file = generate_file();
+            (temp_file.path().display().to_string(), Some(temp_file))
+        }
+    };
+
+    assert!(Path::new(&file_path).exists(), "path not found");
+    println!("Using parquet file {}", file_path);
+
+    let mut context = ExecutionContext::new();
+
+    let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
+    rt.block_on(context.register_parquet("t", file_path.as_str()))
+        .unwrap();
+
+    // We read the queries from a file so they can be changed without recompiling the benchmark
+    let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
+    let mut queries = String::new();
+    queries_file.read_to_string(&mut queries).unwrap();
+
+    for query in queries.split(';') {
+        let query = query.trim();
+
+        // Remove comment lines
+        let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect();
+        let query = query.join(" ");
+
+        // Ignore blank lines
+        if query.is_empty() {
+            continue;
+        }
+
+        let query = query.as_str();
+        c.bench_function(query, |b| {
+            b.iter(|| {
+                let mut context = context.clone();
+                rt.block_on(async move {
+                    let query = context.sql(query).await.unwrap();
+                    let mut stream = query.execute_stream().await.unwrap();
+                    while let Some(_) = criterion::black_box(stream.next().await) {}
+                })
+            });
+        });
+    }
+
+    // Clean up temporary file if any
+    std::mem::drop(temp_file);

Review comment:
       Why do we need to drop the temp file explicitly?  Won't it automatically happen when the variable goes out of scope?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040089702


   There are definitely tweaks that would be cool to make to this, e.g. testing different column encodings, but I think this is a decent starting point


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#discussion_r807309526



##########
File path: datafusion/benches/parquet_query_sql.rs
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks of SQL queries again parquet data
+
+use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
+    SchemaRef,
+};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::ExecutionContext;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use rand::distributions::uniform::SampleUniform;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Range;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::Instant;
+use tempfile::NamedTempFile;
+use tokio_stream::StreamExt;
+
+/// The number of batches to write
+const NUM_BATCHES: usize = 2048;
+/// The number of rows in each record batch to write
+const WRITE_RECORD_BATCH_SIZE: usize = 1024;
+/// The number of rows in a row group
+const ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// The number of row groups expected
+const EXPECTED_ROW_GROUPS: usize = 2;
+
+fn schema() -> SchemaRef {
+    let string_dictionary_type =
+        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
+
+    Arc::new(Schema::new(vec![
+        Field::new("dict_10_required", string_dictionary_type.clone(), false),
+        Field::new("dict_10_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_100_required", string_dictionary_type.clone(), false),
+        Field::new("dict_100_optional", string_dictionary_type.clone(), true),
+        Field::new("dict_1000_required", string_dictionary_type.clone(), false),
+        Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
+        Field::new("string_required", DataType::Utf8, false),
+        Field::new("string_optional", DataType::Utf8, true),
+        Field::new("i64_required", DataType::Int64, false),
+        Field::new("i64_optional", DataType::Int64, true),
+        Field::new("f64_required", DataType::Float64, false),
+        Field::new("f64_optional", DataType::Float64, true),
+    ]))
+}
+
+fn generate_batch() -> RecordBatch {
+    let schema = schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+    RecordBatch::try_new(
+        schema,
+        vec![
+            generate_string_dictionary("prefix", 10, len, 1.0),
+            generate_string_dictionary("prefix", 10, len, 0.5),
+            generate_string_dictionary("prefix", 100, len, 1.0),
+            generate_string_dictionary("prefix", 100, len, 0.5),
+            generate_string_dictionary("prefix", 1000, len, 1.0),
+            generate_string_dictionary("prefix", 1000, len, 0.5),
+            generate_strings(0..100, len, 1.0),
+            generate_strings(0..100, len, 0.5),
+            generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
+            generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
+            generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
+            generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
+        ],
+    )
+    .unwrap()
+}
+
+fn generate_string_dictionary(
+    prefix: &str,
+    cardinality: usize,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    let strings: Vec<_> = (0..cardinality)
+        .map(|x| format!("{}#{}", prefix, x))
+        .collect();
+
+    Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
+        |_| {
+            rng.gen_bool(valid_percent)
+                .then(|| strings[rng.gen_range(0..cardinality)].as_str())
+        },
+    )))
+}
+
+fn generate_strings(
+    string_length_range: Range<usize>,
+    len: usize,
+    valid_percent: f64,
+) -> ArrayRef {
+    let mut rng = thread_rng();
+    Arc::new(StringArray::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent).then(|| {
+            let string_len = rng.gen_range(string_length_range.clone());
+            (0..string_len)
+                .map(|_| char::from(rng.sample(Alphanumeric)))
+                .collect::<String>()
+        })
+    })))
+}
+
+fn generate_primitive<T>(
+    len: usize,
+    valid_percent: f64,
+    range: Range<T::Native>,
+) -> ArrayRef
+where
+    T: ArrowPrimitiveType,
+    T::Native: SampleUniform,
+{
+    let mut rng = thread_rng();
+    Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+        rng.gen_bool(valid_percent)
+            .then(|| rng.gen_range(range.clone()))
+    })))
+}
+
+fn generate_file() -> NamedTempFile {
+    let now = Instant::now();
+    let named_file = tempfile::Builder::new()
+        .prefix("parquet_query_sql")
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+    let schema = schema();
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_size(ROW_GROUP_SIZE)
+        .build();
+
+    let file = named_file.as_file().try_clone().unwrap();
+    let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();
+
+    for _ in 0..NUM_BATCHES {
+        let batch = generate_batch();
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    assert_eq!(
+        metadata.num_rows as usize,
+        WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
+    );
+    assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
+
+    println!(
+        "Generated parquet file in {} seconds",
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
+        Ok(file) => (file, None),
+        Err(_) => {
+            let temp_file = generate_file();
+            (temp_file.path().display().to_string(), Some(temp_file))
+        }
+    };
+
+    assert!(Path::new(&file_path).exists(), "path not found");
+    println!("Using parquet file {}", file_path);
+
+    let mut context = ExecutionContext::new();
+
+    let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
+    rt.block_on(context.register_parquet("t", file_path.as_str()))
+        .unwrap();
+
+    // We read the queries from a file so they can be changed without recompiling the benchmark
+    let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
+    let mut queries = String::new();
+    queries_file.read_to_string(&mut queries).unwrap();
+
+    for query in queries.split(';') {
+        let query = query.trim();
+
+        // Remove comment lines
+        let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect();
+        let query = query.join(" ");
+
+        // Ignore blank lines
+        if query.is_empty() {
+            continue;
+        }
+
+        let query = query.as_str();
+        c.bench_function(query, |b| {
+            b.iter(|| {
+                let mut context = context.clone();
+                rt.block_on(async move {
+                    let query = context.sql(query).await.unwrap();
+                    let mut stream = query.execute_stream().await.unwrap();
+                    while let Some(_) = criterion::black_box(stream.next().await) {}
+                })
+            });
+        });
+    }
+
+    // Clean up temporary file if any
+    std::mem::drop(temp_file);

Review comment:
       It was intended as a hint that the lifetime of `temp_file` matters, i.e. it must live to the end of the benchmark block. In the past I've accidentally refactored tests with NamedTempFile and its broken in odd ways that have boiled down to the temporary file getting cleaned up too early




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold edited a comment on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040089702


   There are definitely tweaks that would be cool to make to this, e.g. testing different column encodings, but I think this is a decent starting point and is now ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040640415


   > I also ran this locally ok_hand very nice:
   > 
   > ```shell
   > cargo bench --bench parquet_query_sql
   > ...
   > Generating parquet file - /var/folders/s3/h5hgj43j0bv83shtmz_t_w400000gn/T/parquet_query_sqlr7Ymzm.parquet
   > Generated parquet file in 6.7890725 seconds
   > Using parquet file /var/folders/s3/h5hgj43j0bv83shtmz_t_w400000gn/T/parquet_query_sqlr7Ymzm.parquet
   > 
   > 
   > ...
   > ng select dict_10_required, dict_100_required, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) ...: Warming up for 3.0000 s
   > Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 12.9s, or reduce sample count to 30.
   > Benchmarking select dict_10_required, dict_100_required, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) ...: Collecting 100 samples in estima                                                                                                                                                      select dict_10_required, dict_100_required, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) ...                        
   >                         time:   [128.35 ms 128.65 ms 128.98 ms]
   > Found 5 outliers among 100 measurements (5.00%)
   >   3 (3.00%) high mild
   >   2 (2.00%) high severe
   > ```
   > 
   > @Igosuki this might be a cool thing to run on the arrow2 branch to see how the performance compares
   
   I will rebase once it is merged 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040769614


   I will sort out the clippy complaint
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1038114256


   Once https://github.com/apache/arrow-datafusion/pull/1775 merges, we can probably clean up this PR and get it merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040625195


   Looks like there is also a clippy complaint here 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold edited a comment on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040089702


   There are definitely tweaks that would be cool to make to this, e.g. testing different column encodings, but I think this is a decent starting point and is now ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1738: Add parquet SQL benchmarks

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1738:
URL: https://github.com/apache/arrow-datafusion/pull/1738#issuecomment-1040625195


   Looks like there is also a clippy complaint here 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org