You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/10 11:19:12 UTC
[arrow-datafusion] branch master updated: Create ListingTableConfig which includes file format and schema inference (#1715)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e5f6969 Create ListingTableConfig which includes file format and schema inference (#1715)
e5f6969 is described below
commit e5f69695da103ceab60f8410d129e4e7a2d5bca7
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Thu Feb 10 06:19:08 2022 -0500
Create ListingTableConfig which includes file format and schema inference (#1715)
* Starting work on ListingTableConfig
* v1 of config
* Refactor for cleaner api
* Update references in rest of code base
* Docs
* Clippy
* Remove async from ListingTable::try_new
* Ballista
* Benchmark fix
* benchs
* Clippy
* Clippy
* Fix empty table bug
* Fix table path
* Get file from directory
* Fix read empty table test
* Remove unnecessary async
* Starting work on ListingTableConfig
* v1 of config
* Refactor for cleaner api
* Update references in rest of code base
* Docs
* Clippy
* Remove async from ListingTable::try_new
* Ballista
* Benchmark fix
* benchs
* Clippy
* Clippy
* Fix empty table bug
* Fix table path
* Get file from directory
* Fix read empty table test
* Remove unnecessary async
* Rebase
* Improve log
* Rebase fix
---
.../rust/core/src/serde/logical_plan/from_proto.rs | 2 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 13 +-
benchmarks/src/bin/tpch.rs | 24 +--
datafusion/benches/sort_limit_query_sql.rs | 18 +-
datafusion/src/datasource/listing/mod.rs | 2 +-
datafusion/src/datasource/listing/table.rs | 200 +++++++++++++++++----
datafusion/src/execution/context.rs | 7 +-
datafusion/src/logical_plan/builder.rs | 21 ++-
datafusion/tests/path_partition.rs | 15 +-
9 files changed, 222 insertions(+), 80 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 5951376..fb900e1 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, SizedFile};
use datafusion::logical_plan::window_frames::{
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 31c3308..3166a48 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::logical_plan::plan::Extension;
use datafusion::logical_plan::plan::{
@@ -241,12 +241,11 @@ impl AsLogicalPlan for LogicalPlanNode {
scan.path.as_str()
);
- let provider = ListingTable::new(
- object_store,
- scan.path.clone(),
- Arc::new(schema),
- options,
- );
+ let config = ListingTableConfig::new(object_store, scan.path.as_str())
+ .with_listing_options(options)
+ .with_schema(Arc::new(schema));
+
+ let provider = ListingTable::try_new(config)?;
LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 3880ca6..93c44ef 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -51,7 +51,7 @@ use datafusion::{
use datafusion::{
arrow::util::pretty,
datasource::{
- listing::{ListingOptions, ListingTable},
+ listing::{ListingOptions, ListingTable, ListingTableConfig},
object_store::local::LocalFileSystem,
},
};
@@ -724,12 +724,11 @@ fn get_table(
table_partition_cols: vec![],
};
- Ok(Arc::new(ListingTable::new(
- Arc::new(LocalFileSystem {}),
- path,
- schema,
- options,
- )))
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
+ .with_listing_options(options)
+ .with_schema(schema);
+
+ Ok(Arc::new(ListingTable::try_new(config)?))
}
fn get_schema(table: &str) -> Schema {
@@ -1389,12 +1388,13 @@ mod tests {
.has_header(false)
.file_extension(".tbl");
let listing_options = options.to_listing_options(1);
- let provider = ListingTable::new(
+ let config = ListingTableConfig::new(
Arc::new(LocalFileSystem {}),
- format!("{}/{}.tbl", tpch_data_path, table),
- Arc::new(schema),
- listing_options,
- );
+ tpch_data_path.clone(),
+ )
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
+ let provider = ListingTable::try_new(config)?;
ctx.register_table(table, Arc::new(provider))?;
}
diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs
index 3828014..2434341 100644
--- a/datafusion/benches/sort_limit_query_sql.rs
+++ b/datafusion/benches/sort_limit_query_sql.rs
@@ -19,7 +19,7 @@
extern crate criterion;
use criterion::Criterion;
use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use parking_lot::Mutex;
@@ -63,14 +63,16 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
let testdata = datafusion::test_util::arrow_test_data();
+ let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+
// create CSV data source
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
- let csv = ListingTable::new(
- Arc::new(LocalFileSystem {}),
- format!("{}/csv/aggregate_test_100.csv", testdata),
- schema,
- listing_options,
- );
+
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
+ .with_listing_options(listing_options)
+ .with_schema(schema);
+
+ let csv = async { ListingTable::try_new(config).unwrap() };
let rt = Runtime::new().unwrap();
@@ -85,7 +87,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
ctx.state.lock().config.target_partitions = 1;
let runtime = ctx.state.lock().runtime_env.clone();
- let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
+ let mem_table = MemTable::load(Arc::new(csv.await), Some(partitions), runtime)
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
diff --git a/datafusion/src/datasource/listing/mod.rs b/datafusion/src/datasource/listing/mod.rs
index c8b9241..2f1d034 100644
--- a/datafusion/src/datasource/listing/mod.rs
+++ b/datafusion/src/datasource/listing/mod.rs
@@ -21,4 +21,4 @@
mod helpers;
mod table;
-pub use table::{ListingOptions, ListingTable};
+pub use table::{ListingOptions, ListingTable, ListingTableConfig};
diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs
index 1501b8b..3fbd6c1 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -24,7 +24,11 @@ use async_trait::async_trait;
use futures::StreamExt;
use crate::{
- error::Result,
+ datasource::file_format::avro::AvroFormat,
+ datasource::file_format::csv::CsvFormat,
+ datasource::file_format::json::JsonFormat,
+ datasource::file_format::parquet::ParquetFormat,
+ error::{DataFusionError, Result},
logical_plan::Expr,
physical_plan::{
empty::EmptyExec,
@@ -40,7 +44,124 @@ use crate::datasource::{
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
+/// Configuration for creating a 'ListingTable'
+pub struct ListingTableConfig {
+ /// `ObjectStore` that contains the files for the `ListingTable`.
+ pub object_store: Arc<dyn ObjectStore>,
+ /// Path on the `ObjectStore` for creating `ListingTable`.
+ pub table_path: String,
+ /// Optional `SchemaRef` for the to be created `ListingTable`.
+ pub file_schema: Option<SchemaRef>,
+ /// Optional `ListingOptions` for the to be created `ListingTable`.
+ pub options: Option<ListingOptions>,
+}
+
+impl ListingTableConfig {
+ /// Creates new `ListingTableConfig`. The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`.
+ pub fn new(
+ object_store: Arc<dyn ObjectStore>,
+ table_path: impl Into<String>,
+ ) -> Self {
+ Self {
+ object_store,
+ table_path: table_path.into(),
+ file_schema: None,
+ options: None,
+ }
+ }
+ /// Add `schema` to `ListingTableConfig`
+ pub fn with_schema(self, schema: SchemaRef) -> Self {
+ Self {
+ object_store: self.object_store,
+ table_path: self.table_path,
+ file_schema: Some(schema),
+ options: self.options,
+ }
+ }
+
+ /// Add `listing_options` to `ListingTableConfig`
+ pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
+ Self {
+ object_store: self.object_store,
+ table_path: self.table_path,
+ file_schema: self.file_schema,
+ options: Some(listing_options),
+ }
+ }
+
+ fn infer_format(suffix: &str) -> Result<Arc<dyn FileFormat>> {
+ match suffix {
+ "avro" => Ok(Arc::new(AvroFormat::default())),
+ "csv" => Ok(Arc::new(CsvFormat::default())),
+ "json" => Ok(Arc::new(JsonFormat::default())),
+ "parquet" => Ok(Arc::new(ParquetFormat::default())),
+ _ => Err(DataFusionError::Internal(format!(
+ "Unable to infer file type from suffix {}",
+ suffix
+ ))),
+ }
+ }
+
+ /// Infer `ListingOptions` based on `table_path` suffix.
+ pub async fn infer_options(self) -> Result<Self> {
+ let mut files = self.object_store.list_file(&self.table_path).await?;
+ let file = files
+ .next()
+ .await
+ .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
+
+ let tokens: Vec<&str> = file.path().split('.').collect();
+ let file_type = tokens.last().ok_or_else(|| {
+ DataFusionError::Internal("Unable to infer file suffix".into())
+ })?;
+
+ let format = ListingTableConfig::infer_format(*file_type)?;
+
+ let listing_options = ListingOptions {
+ format,
+ collect_stat: true,
+ file_extension: file_type.to_string(),
+ target_partitions: num_cpus::get(),
+ table_partition_cols: vec![],
+ };
+
+ Ok(Self {
+ object_store: self.object_store,
+ table_path: self.table_path,
+ file_schema: self.file_schema,
+ options: Some(listing_options),
+ })
+ }
+
+ /// Infer `SchemaRef` based on `table_path` suffix. Requires `self.options` to be set prior to using.
+ pub async fn infer_schema(self) -> Result<Self> {
+ match self.options {
+ Some(options) => {
+ let schema = options
+ .infer_schema(self.object_store.clone(), self.table_path.as_str())
+ .await?;
+
+ Ok(Self {
+ object_store: self.object_store,
+ table_path: self.table_path,
+ file_schema: Some(schema),
+ options: Some(options),
+ })
+ }
+ None => Err(DataFusionError::Internal(
+ "No `ListingOptions` set for inferring schema".into(),
+ )),
+ }
+ }
+
+ /// Convenience wrapper for calling `infer_options` and `infer_schema`
+ pub async fn infer(self) -> Result<Self> {
+ self.infer_options().await?.infer_schema().await
+ }
+}
+
/// Options for creating a `ListingTable`
+#[derive(Clone)]
pub struct ListingOptions {
/// A suffix on which files should be filtered (leave empty to
/// keep all files on the path)
@@ -115,15 +236,21 @@ pub struct ListingTable {
impl ListingTable {
/// Create new table that lists the FS to get the files to scan.
- /// The provided `schema` must be resolved before creating the table
+ /// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
+ /// `ListingOptions` and `SchemaRef` are optional. If they are not
+ /// provided the file type is inferred based on the file suffix.
+ /// If the schema is provided then it must be resolved before creating the table
/// and should contain the fields of the file without the table
/// partitioning columns.
- pub fn new(
- object_store: Arc<dyn ObjectStore>,
- table_path: String,
- file_schema: SchemaRef,
- options: ListingOptions,
- ) -> Self {
+ pub fn try_new(config: ListingTableConfig) -> Result<Self> {
+ let file_schema = config
+ .file_schema
+ .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
+
+ let options = config.options.ok_or_else(|| {
+ DataFusionError::Internal("No ListingOptions provided".into())
+ })?;
+
// Add the partition columns to the file schema
let mut table_fields = file_schema.fields().clone();
for part in &options.table_partition_cols {
@@ -134,13 +261,15 @@ impl ListingTable {
));
}
- Self {
- object_store,
- table_path,
+ let table = Self {
+ object_store: config.object_store.clone(),
+ table_path: config.table_path.clone(),
file_schema,
table_schema: Arc::new(Schema::new(table_fields)),
options,
- }
+ };
+
+ Ok(table)
}
/// Get object store ref
@@ -264,10 +393,7 @@ impl ListingTable {
#[cfg(test)]
mod tests {
- use arrow::datatypes::DataType;
-
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
- use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::{
datasource::{
file_format::{avro::AvroFormat, parquet::ParquetFormat},
@@ -276,6 +402,7 @@ mod tests {
logical_plan::{col, lit},
test::{columns, object_store::TestObjectStore},
};
+ use arrow::datatypes::DataType;
use super::*;
@@ -306,8 +433,10 @@ mod tests {
let schema = opt
.infer_schema(Arc::new(LocalFileSystem {}), &filename)
.await?;
- let table =
- ListingTable::new(Arc::new(LocalFileSystem {}), filename, schema, opt);
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename)
+ .with_listing_options(opt)
+ .with_schema(schema);
+ let table = ListingTable::try_new(config)?;
let exec = table.scan(&None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
@@ -317,7 +446,8 @@ mod tests {
#[tokio::test]
async fn read_empty_table() -> Result<()> {
- let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]);
+ let path = String::from("table/p1=v1/file.avro");
+ let store = TestObjectStore::new_arc(&[(&path, 100)]);
let opt = ListingOptions {
file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
@@ -327,10 +457,13 @@ mod tests {
collect_stat: true,
};
- let file_schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
+ let file_schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
+ let config = ListingTableConfig::new(store, "table/")
+ .with_listing_options(opt)
+ .with_schema(file_schema);
+ let table = ListingTable::try_new(config)?;
- let table =
- ListingTable::new(store, "table/".to_owned(), Arc::new(file_schema), opt);
assert_eq!(
columns(&table.schema()),
vec!["a".to_owned(), "p1".to_owned()]
@@ -420,20 +553,10 @@ mod tests {
async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
- let opt = ListingOptions {
- file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
- format: Arc::new(ParquetFormat::default()),
- table_partition_cols: vec![],
- target_partitions: 2,
- collect_stat: true,
- };
- // here we resolve the schema locally
- let schema = opt
- .infer_schema(Arc::new(LocalFileSystem {}), &filename)
- .await
- .expect("Infer schema");
- let table =
- ListingTable::new(Arc::new(LocalFileSystem {}), filename, schema, opt);
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename)
+ .infer()
+ .await?;
+ let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
}
@@ -460,8 +583,11 @@ mod tests {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
- let table =
- ListingTable::new(mock_store, table_prefix.to_owned(), Arc::new(schema), opt);
+ let config = ListingTableConfig::new(mock_store, table_prefix.to_owned())
+ .with_listing_options(opt)
+ .with_schema(Arc::new(schema));
+
+ let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&[], None).await?;
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index d4f1ca4..393e4af 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -56,6 +56,7 @@ use crate::catalog::{
schema::{MemorySchemaProvider, SchemaProvider},
ResolvedTableReference, TableReference,
};
+use crate::datasource::listing::ListingTableConfig;
use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
@@ -455,8 +456,10 @@ impl ExecutionContext {
}
Some(s) => s,
};
- let table =
- ListingTable::new(object_store, path.to_owned(), resolved_schema, options);
+ let config = ListingTableConfig::new(object_store, path)
+ .with_listing_options(options)
+ .with_schema(resolved_schema);
+ let table = ListingTable::try_new(config)?;
self.register_table(name, Arc::new(table))?;
Ok(())
}
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index a722238..0144b75 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -20,7 +20,7 @@
use crate::datasource::{
empty::EmptyTable,
file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
- listing::{ListingOptions, ListingTable},
+ listing::{ListingOptions, ListingTable, ListingTableConfig},
object_store::ObjectStore,
MemTable, TableProvider,
};
@@ -243,8 +243,10 @@ impl LogicalPlanBuilder {
.await?
}
};
- let provider =
- ListingTable::new(object_store, path, resolved_schema, listing_options);
+ let config = ListingTableConfig::new(object_store, path)
+ .with_listing_options(listing_options)
+ .with_schema(resolved_schema);
+ let provider = ListingTable::try_new(config)?;
Self::scan(table_name, Arc::new(provider), projection)
}
@@ -293,8 +295,11 @@ impl LogicalPlanBuilder {
.infer_schema(Arc::clone(&object_store), &path)
.await?;
- let provider =
- ListingTable::new(object_store, path, resolved_schema, listing_options);
+ let config = ListingTableConfig::new(object_store, path)
+ .with_listing_options(listing_options)
+ .with_schema(resolved_schema);
+
+ let provider = ListingTable::try_new(config)?;
Self::scan(table_name, Arc::new(provider), projection)
}
@@ -339,8 +344,10 @@ impl LogicalPlanBuilder {
.await?
}
};
- let provider =
- ListingTable::new(object_store, path, resolved_schema, listing_options);
+ let config = ListingTableConfig::new(object_store, path)
+ .with_listing_options(listing_options)
+ .with_schema(resolved_schema);
+ let provider = ListingTable::try_new(config)?;
Self::scan(table_name, Arc::new(provider), projection)
}
diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs
index e68ef32..178e318 100644
--- a/datafusion/tests/path_partition.rs
+++ b/datafusion/tests/path_partition.rs
@@ -24,7 +24,7 @@ use datafusion::{
assert_batches_sorted_eq,
datasource::{
file_format::{csv::CsvFormat, parquet::ParquetFormat},
- listing::{ListingOptions, ListingTable},
+ listing::{ListingOptions, ListingTable, ListingTableConfig},
object_store::{
local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
ObjectReader, ObjectStore, SizedFile,
@@ -285,8 +285,10 @@ fn register_partitioned_aggregate_csv(
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
- let table =
- ListingTable::new(object_store, table_path.to_owned(), file_schema, options);
+ let config = ListingTableConfig::new(object_store, table_path)
+ .with_listing_options(options)
+ .with_schema(file_schema);
+ let table = ListingTable::try_new(config).unwrap();
ctx.register_table("t", Arc::new(table))
.expect("registering listing table failed");
@@ -313,8 +315,11 @@ async fn register_partitioned_alltypes_parquet(
.await
.expect("Parquet schema inference failed");
- let table =
- ListingTable::new(object_store, table_path.to_owned(), file_schema, options);
+ let config = ListingTableConfig::new(object_store, table_path)
+ .with_listing_options(options)
+ .with_schema(file_schema);
+
+ let table = ListingTable::try_new(config).unwrap();
ctx.register_table("t", Arc::new(table))
.expect("registering listing table failed");