You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/09 10:07:59 UTC

[arrow-datafusion] 01/01: Cleanup tpch benchmark

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch cleanup_tpch
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 9de450ced5b231e0285fbce62f36a375c71d0c20
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Fri Jun 9 12:07:46 2023 +0200

    Cleanup tpch benchmark
---
 benchmarks/src/tpch.rs | 285 -------------------------------------------------
 testing                |   2 +-
 2 files changed, 1 insertion(+), 286 deletions(-)

diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs
index 72f5907b07..f63b05faae 100644
--- a/benchmarks/src/tpch.rs
+++ b/benchmarks/src/tpch.rs
@@ -147,155 +147,6 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
     }
 }
 
-/// Get the expected schema for the results of a query
-pub fn get_answer_schema(n: usize) -> Schema {
-    match n {
-        1 => Schema::new(vec![
-            Field::new("l_returnflag", DataType::Utf8, true),
-            Field::new("l_linestatus", DataType::Utf8, true),
-            Field::new("sum_qty", DataType::Decimal128(15, 2), true),
-            Field::new("sum_base_price", DataType::Decimal128(15, 2), true),
-            Field::new("sum_disc_price", DataType::Decimal128(15, 2), true),
-            Field::new("sum_charge", DataType::Decimal128(15, 2), true),
-            Field::new("avg_qty", DataType::Decimal128(15, 2), true),
-            Field::new("avg_price", DataType::Decimal128(15, 2), true),
-            Field::new("avg_disc", DataType::Decimal128(15, 2), true),
-            Field::new("count_order", DataType::Int64, true),
-        ]),
-
-        2 => Schema::new(vec![
-            Field::new("s_acctbal", DataType::Decimal128(15, 2), true),
-            Field::new("s_name", DataType::Utf8, true),
-            Field::new("n_name", DataType::Utf8, true),
-            Field::new("p_partkey", DataType::Int64, true),
-            Field::new("p_mfgr", DataType::Utf8, true),
-            Field::new("s_address", DataType::Utf8, true),
-            Field::new("s_phone", DataType::Utf8, true),
-            Field::new("s_comment", DataType::Utf8, true),
-        ]),
-
-        3 => Schema::new(vec![
-            Field::new("l_orderkey", DataType::Int64, true),
-            Field::new("revenue", DataType::Decimal128(15, 2), true),
-            Field::new("o_orderdate", DataType::Date32, true),
-            Field::new("o_shippriority", DataType::Int32, true),
-        ]),
-
-        4 => Schema::new(vec![
-            Field::new("o_orderpriority", DataType::Utf8, true),
-            Field::new("order_count", DataType::Int64, true),
-        ]),
-
-        5 => Schema::new(vec![
-            Field::new("n_name", DataType::Utf8, true),
-            Field::new("revenue", DataType::Decimal128(15, 2), true),
-        ]),
-
-        6 => Schema::new(vec![Field::new(
-            "revenue",
-            DataType::Decimal128(15, 2),
-            true,
-        )]),
-
-        7 => Schema::new(vec![
-            Field::new("supp_nation", DataType::Utf8, true),
-            Field::new("cust_nation", DataType::Utf8, true),
-            Field::new("l_year", DataType::Float64, true),
-            Field::new("revenue", DataType::Decimal128(15, 2), true),
-        ]),
-
-        8 => Schema::new(vec![
-            Field::new("o_year", DataType::Float64, true),
-            Field::new("mkt_share", DataType::Decimal128(15, 2), true),
-        ]),
-
-        9 => Schema::new(vec![
-            Field::new("nation", DataType::Utf8, true),
-            Field::new("o_year", DataType::Float64, true),
-            Field::new("sum_profit", DataType::Decimal128(15, 2), true),
-        ]),
-
-        10 => Schema::new(vec![
-            Field::new("c_custkey", DataType::Int64, true),
-            Field::new("c_name", DataType::Utf8, true),
-            Field::new("revenue", DataType::Decimal128(15, 2), true),
-            Field::new("c_acctbal", DataType::Decimal128(15, 2), true),
-            Field::new("n_name", DataType::Utf8, true),
-            Field::new("c_address", DataType::Utf8, true),
-            Field::new("c_phone", DataType::Utf8, true),
-            Field::new("c_comment", DataType::Utf8, true),
-        ]),
-
-        11 => Schema::new(vec![
-            Field::new("ps_partkey", DataType::Int64, true),
-            Field::new("value", DataType::Decimal128(15, 2), true),
-        ]),
-
-        12 => Schema::new(vec![
-            Field::new("l_shipmode", DataType::Utf8, true),
-            Field::new("high_line_count", DataType::Int64, true),
-            Field::new("low_line_count", DataType::Int64, true),
-        ]),
-
-        13 => Schema::new(vec![
-            Field::new("c_count", DataType::Int64, true),
-            Field::new("custdist", DataType::Int64, true),
-        ]),
-
-        14 => Schema::new(vec![Field::new("promo_revenue", DataType::Float64, true)]),
-
-        15 => Schema::new(vec![
-            Field::new("s_suppkey", DataType::Int64, true),
-            Field::new("s_name", DataType::Utf8, true),
-            Field::new("s_address", DataType::Utf8, true),
-            Field::new("s_phone", DataType::Utf8, true),
-            Field::new("total_revenue", DataType::Decimal128(15, 2), true),
-        ]),
-
-        16 => Schema::new(vec![
-            Field::new("p_brand", DataType::Utf8, true),
-            Field::new("p_type", DataType::Utf8, true),
-            Field::new("p_size", DataType::Int32, true),
-            Field::new("supplier_cnt", DataType::Int64, true),
-        ]),
-
-        17 => Schema::new(vec![Field::new("avg_yearly", DataType::Float64, true)]),
-
-        18 => Schema::new(vec![
-            Field::new("c_name", DataType::Utf8, true),
-            Field::new("c_custkey", DataType::Int64, true),
-            Field::new("o_orderkey", DataType::Int64, true),
-            Field::new("o_orderdate", DataType::Date32, true),
-            Field::new("o_totalprice", DataType::Decimal128(15, 2), true),
-            Field::new("sum_l_quantity", DataType::Decimal128(15, 2), true),
-        ]),
-
-        19 => Schema::new(vec![Field::new(
-            "revenue",
-            DataType::Decimal128(15, 2),
-            true,
-        )]),
-
-        20 => Schema::new(vec![
-            Field::new("s_name", DataType::Utf8, true),
-            Field::new("s_address", DataType::Utf8, true),
-        ]),
-
-        21 => Schema::new(vec![
-            Field::new("s_name", DataType::Utf8, true),
-            Field::new("numwait", DataType::Int64, true),
-        ]),
-
-        22 => Schema::new(vec![
-            Field::new("cntrycode", DataType::Utf8, true),
-            Field::new("numcust", DataType::Int64, true),
-            Field::new("totacctbal", DataType::Decimal128(15, 2), true),
-        ]),
-
-        _ => unimplemented!(),
-    }
-}
-
 /// Get the SQL statements from the specified query file
 pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
     if query > 0 && query < 23 {
@@ -399,142 +250,6 @@ pub async fn convert_tbl(
     Ok(())
 }
 
-/// Converts the results into a 2d array of strings, `result[row][column]`
-/// Special cases nulls to NULL for testing
-pub fn result_vec(results: &[RecordBatch]) -> Vec<Vec<ScalarValue>> {
-    let mut result = vec![];
-    for batch in results {
-        for row_index in 0..batch.num_rows() {
-            let row_vec = batch
-                .columns()
-                .iter()
-                .map(|column| col_to_scalar(column, row_index))
-                .collect();
-            result.push(row_vec);
-        }
-    }
-    result
-}
-
-/// convert expected schema to all utf8 so columns can be read as strings to be parsed separately
-/// this is due to the fact that the csv parser cannot handle leading/trailing spaces
-pub fn string_schema(schema: Schema) -> Schema {
-    Schema::new(
-        schema
-            .fields()
-            .iter()
-            .map(|field| {
-                Field::new(
-                    Field::name(field),
-                    DataType::Utf8,
-                    Field::is_nullable(field),
-                )
-            })
-            .collect::<Vec<Field>>(),
-    )
-}
-
-fn col_to_scalar(column: &ArrayRef, row_index: usize) -> ScalarValue {
-    if column.is_null(row_index) {
-        return ScalarValue::Null;
-    }
-    match column.data_type() {
-        DataType::Int32 => {
-            let array = as_int32_array(column).unwrap();
-            ScalarValue::Int32(Some(array.value(row_index)))
-        }
-        DataType::Int64 => {
-            let array = as_int64_array(column).unwrap();
-            ScalarValue::Int64(Some(array.value(row_index)))
-        }
-        DataType::Float64 => {
-            let array = as_float64_array(column).unwrap();
-            ScalarValue::Float64(Some(array.value(row_index)))
-        }
-        DataType::Decimal128(p, s) => {
-            let array = as_decimal128_array(column).unwrap();
-            ScalarValue::Decimal128(Some(array.value(row_index)), *p, *s)
-        }
-        DataType::Date32 => {
-            let array = as_date32_array(column).unwrap();
-            ScalarValue::Date32(Some(array.value(row_index)))
-        }
-        DataType::Utf8 => {
-            let array = as_string_array(column).unwrap();
-            ScalarValue::Utf8(Some(array.value(row_index).to_string()))
-        }
-        other => panic!("unexpected data type in benchmark: {other}"),
-    }
-}
-
-pub async fn transform_actual_result(
-    result: Vec<RecordBatch>,
-    n: usize,
-) -> Result<Vec<RecordBatch>> {
-    // to compare the recorded answers to the answers we got back from running the query,
-    // we need to round the decimal columns and trim the Utf8 columns
-    // we also need to rewrite the batches to use a compatible schema
-    let ctx = SessionContext::new();
-    let fields: Fields = result[0]
-        .schema()
-        .fields()
-        .iter()
-        .map(|f| {
-            let simple_name = match f.name().find('.') {
-                Some(i) => f.name()[i + 1..].to_string(),
-                _ => f.name().to_string(),
-            };
-            f.as_ref().clone().with_name(simple_name)
-        })
-        .collect();
-    let result_schema = SchemaRef::new(Schema::new(fields));
-    let result = result
-        .iter()
-        .map(|b| {
-            RecordBatch::try_new(result_schema.clone(), b.columns().to_vec())
-                .map_err(|e| e.into())
-        })
-        .collect::<Result<Vec<_>>>()?;
-    let table = Arc::new(MemTable::try_new(result_schema.clone(), vec![result])?);
-    let mut df = ctx.read_table(table)?.select(
-        result_schema
-            .fields
-            .iter()
-            .map(|field| {
-                match field.data_type() {
-                    DataType::Decimal128(_, _) => {
-                        // if decimal, then round it to 2 decimal places like the answers
-                        // round() doesn't support the second argument for decimal places to round to
-                        // this can be simplified to remove the mul and div when
-                        // https://github.com/apache/arrow-datafusion/issues/2420 is completed
-                        // cast it back to an over-sized Decimal with 2 precision when done rounding
-                        let round = Box::new(
-                            Expr::ScalarFunction(ScalarFunction::new(
-                                datafusion::logical_expr::BuiltinScalarFunction::Round,
-                                vec![col(Field::name(field)).mul(lit(100))],
-                            ))
-                            .div(lit(100)),
-                        );
-                        Expr::Cast(Cast::new(round, DataType::Decimal128(15, 2)))
-                            .alias(field.name())
-                    }
-                    DataType::Utf8 => {
-                        // if string, then trim it like the answers got trimmed
-                        trim(col(Field::name(field))).alias(field.name())
-                    }
-                    _ => col(field.name()),
-                }
-            })
-            .collect(),
-    )?;
-    if let Some(x) = QUERY_LIMIT[n - 1] {
-        df = df.limit(0, Some(x))?;
-    }
-
-    let df = df.collect().await?;
-    Ok(df)
-}
-
 pub const QUERY_LIMIT: [Option<usize>; 22] = [
     None,
     Some(100),
diff --git a/testing b/testing
index e81d0c6de3..5bab2f264a 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88