You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/05 19:55:27 UTC
[arrow-datafusion] branch master updated: Handle trailing tbl column in TPCH benchmarks (#4821)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b410fce10 Handle trailing tbl column in TPCH benchmarks (#4821)
b410fce10 is described below
commit b410fce104435d2aeb75faf6dc53b00b88101b04
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jan 5 19:55:21 2023 +0000
Handle trailing tbl column in TPCH benchmarks (#4821)
* Handle trailing tbl column in TPCH benchmarks
* Clippy
* Fix benchmarks
---
benchmarks/src/bin/tpch.rs | 15 ++++++++-------
benchmarks/src/tpch.rs | 21 ++++++++++++++++++++-
2 files changed, 28 insertions(+), 8 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index bff7999cd..63eb9127e 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -402,7 +402,6 @@ async fn get_table(
unimplemented!("Invalid file format '{}'", other);
}
};
- let schema = Arc::new(get_tpch_table_schema(table));
let options = ListingOptions::new(format)
.with_file_extension(extension)
@@ -412,10 +411,11 @@ async fn get_table(
let table_path = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(table_path).with_listing_options(options);
- let config = if table_format == "parquet" {
- config.infer_schema(&state).await?
- } else {
- config.with_schema(schema)
+ let config = match table_format {
+ "parquet" => config.infer_schema(&state).await?,
+ "tbl" => config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
+ "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))),
+ _ => unreachable!(),
};
Ok(Arc::new(ListingTable::try_new(config)?))
@@ -827,6 +827,7 @@ mod tests {
#[cfg(feature = "ci")]
mod ci {
use super::*;
+ use arrow::datatypes::{DataType, Field};
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
async fn serde_round_trip(query: usize) -> Result<()> {
@@ -1086,7 +1087,6 @@ mod ci {
/// * the correct number of rows are returned
/// * the content of the rows is correct
async fn verify_query(n: usize) -> Result<()> {
- use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::ScalarValue;
use datafusion::logical_expr::expr::Cast;
@@ -1214,7 +1214,8 @@ mod ci {
}
fn get_tpch_data_path() -> Result<String> {
- let path = std::env::var("TPCH_DATA").unwrap_or("benchmarks/data".to_string());
+ let path =
+ std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
if !Path::new(&path).exists() {
return Err(DataFusionError::Execution(format!(
"Benchmark data not found (set TPCH_DATA env var to override): {}",
diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs
index deaecdd93..47a69f62d 100644
--- a/benchmarks/src/tpch.rs
+++ b/benchmarks/src/tpch.rs
@@ -43,6 +43,15 @@ pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
];
+/// The `.tbl` file contains a trailing column
+pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
+ let mut schema = get_tpch_table_schema(table);
+ schema
+ .fields
+ .push(Field::new("__placeholder", DataType::Utf8, false));
+ schema
+}
+
/// Get the schema for the benchmarks derived from TPC-H
pub fn get_tpch_table_schema(table: &str) -> Schema {
// note that the schema intentionally uses signed integers so that any generated Parquet
@@ -331,7 +340,7 @@ pub async fn convert_tbl(
let output_root_path = Path::new(output_path);
for table in TPCH_TABLES {
let start = Instant::now();
- let schema = get_tpch_table_schema(table);
+ let schema = get_tbl_tpch_table_schema(table);
let input_path = format!("{input_path}/{table}.tbl");
let options = CsvReadOptions::new()
@@ -346,6 +355,16 @@ pub async fn convert_tbl(
// build plan to read the TBL file
let mut csv = ctx.read_csv(&input_path, options).await?;
+ // Select all apart from the padding column
+ let selection = csv
+ .schema()
+ .fields()
+ .iter()
+ .take(schema.fields.len() - 1)
+ .map(|d| Expr::Column(d.qualified_column()))
+ .collect();
+
+ csv = csv.select(selection)?;
// optionally, repartition the file
if partitions > 1 {
csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?