You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/05 19:55:27 UTC

[arrow-datafusion] branch master updated: Handle trailing tbl column in TPCH benchmarks (#4821)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b410fce10 Handle trailing tbl column in TPCH benchmarks (#4821)
b410fce10 is described below

commit b410fce104435d2aeb75faf6dc53b00b88101b04
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jan 5 19:55:21 2023 +0000

    Handle trailing tbl column in TPCH benchmarks (#4821)
    
    * Handle trailing tbl column in TPCH benchmarks
    
    * Clippy
    
    * Fix benchmarks
---
 benchmarks/src/bin/tpch.rs | 15 ++++++++-------
 benchmarks/src/tpch.rs     | 21 ++++++++++++++++++++-
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index bff7999cd..63eb9127e 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -402,7 +402,6 @@ async fn get_table(
                 unimplemented!("Invalid file format '{}'", other);
             }
         };
-    let schema = Arc::new(get_tpch_table_schema(table));
 
     let options = ListingOptions::new(format)
         .with_file_extension(extension)
@@ -412,10 +411,11 @@ async fn get_table(
     let table_path = ListingTableUrl::parse(path)?;
     let config = ListingTableConfig::new(table_path).with_listing_options(options);
 
-    let config = if table_format == "parquet" {
-        config.infer_schema(&state).await?
-    } else {
-        config.with_schema(schema)
+    let config = match table_format {
+        "parquet" => config.infer_schema(&state).await?,
+        "tbl" => config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
+        "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))),
+        _ => unreachable!(),
     };
 
     Ok(Arc::new(ListingTable::try_new(config)?))
@@ -827,6 +827,7 @@ mod tests {
 #[cfg(feature = "ci")]
 mod ci {
     use super::*;
+    use arrow::datatypes::{DataType, Field};
     use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
 
     async fn serde_round_trip(query: usize) -> Result<()> {
@@ -1086,7 +1087,6 @@ mod ci {
     ///  * the correct number of rows are returned
     ///  * the content of the rows is correct
     async fn verify_query(n: usize) -> Result<()> {
-        use datafusion::arrow::datatypes::{DataType, Field};
         use datafusion::common::ScalarValue;
         use datafusion::logical_expr::expr::Cast;
 
@@ -1214,7 +1214,8 @@ mod ci {
     }
 
     fn get_tpch_data_path() -> Result<String> {
-        let path = std::env::var("TPCH_DATA").unwrap_or("benchmarks/data".to_string());
+        let path =
+            std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
         if !Path::new(&path).exists() {
             return Err(DataFusionError::Execution(format!(
                 "Benchmark data not found (set TPCH_DATA env var to override): {}",
diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs
index deaecdd93..47a69f62d 100644
--- a/benchmarks/src/tpch.rs
+++ b/benchmarks/src/tpch.rs
@@ -43,6 +43,15 @@ pub const TPCH_TABLES: &[&str] = &[
     "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
 ];
 
+/// The `.tbl` file contains a trailing column
+pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
+    let mut schema = get_tpch_table_schema(table);
+    schema
+        .fields
+        .push(Field::new("__placeholder", DataType::Utf8, false));
+    schema
+}
+
 /// Get the schema for the benchmarks derived from TPC-H
 pub fn get_tpch_table_schema(table: &str) -> Schema {
     // note that the schema intentionally uses signed integers so that any generated Parquet
@@ -331,7 +340,7 @@ pub async fn convert_tbl(
     let output_root_path = Path::new(output_path);
     for table in TPCH_TABLES {
         let start = Instant::now();
-        let schema = get_tpch_table_schema(table);
+        let schema = get_tbl_tpch_table_schema(table);
 
         let input_path = format!("{input_path}/{table}.tbl");
         let options = CsvReadOptions::new()
@@ -346,6 +355,16 @@ pub async fn convert_tbl(
         // build plan to read the TBL file
         let mut csv = ctx.read_csv(&input_path, options).await?;
 
+        // Select all apart from the padding column
+        let selection = csv
+            .schema()
+            .fields()
+            .iter()
+            .take(schema.fields.len() - 1)
+            .map(|d| Expr::Column(d.qualified_column()))
+            .collect();
+
+        csv = csv.select(selection)?;
         // optionally, repartition the file
         if partitions > 1 {
             csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?