You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/23 17:58:40 UTC
[arrow-datafusion] branch master updated: Support types other than String for partition columns on ListingTables (#4221)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 55bf8e9c2 Support types other than String for partition columns on ListingTables (#4221)
55bf8e9c2 is described below
commit 55bf8e9c2ca6d358e7dc4c37e1439de38008f960
Author: Jie Han <11...@users.noreply.github.com>
AuthorDate: Thu Nov 24 01:58:33 2022 +0800
Support types other than String for partition columns on ListingTables (#4221)
* partition columns config
* solve test compile problems
* fix ut
* add partition column types config for each file reader options
* get correct partition column types from partitioned files
* remove DEFAULT_PARTITION_COLUMN_TYPE
* update pruned_partition_list
* change api
* add some tests
* create partitioned external table with schema
* upd
* Update datafusion/core/src/datasource/listing_table_factory.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* code clean
* code clean
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/core/src/datasource/listing/helpers.rs | 77 ++++++++++---
datafusion/core/src/datasource/listing/table.rs | 61 +++++++---
.../core/src/datasource/listing_table_factory.rs | 38 ++++++-
datafusion/core/src/datasource/memory.rs | 1 +
datafusion/core/src/execution/options.rs | 34 +++---
.../core/src/physical_plan/file_format/avro.rs | 6 +-
.../core/src/physical_plan/file_format/csv.rs | 4 +-
.../src/physical_plan/file_format/file_stream.rs | 6 +-
.../core/src/physical_plan/file_format/mod.rs | 41 ++++---
.../core/src/physical_plan/file_format/parquet.rs | 7 +-
datafusion/core/tests/path_partition.rs | 123 +++++++++++++++++++--
datafusion/core/tests/sql/create_drop.rs | 68 ++++++++++++
datafusion/proto/src/logical_plan.rs | 22 +++-
13 files changed, 399 insertions(+), 89 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index f1a34e665..3cfe9ec14 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -19,6 +19,7 @@
use std::sync::Arc;
+use arrow::array::new_empty_array;
use arrow::{
array::{
Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder,
@@ -176,7 +177,7 @@ pub async fn pruned_partition_list<'a>(
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
file_extension: &'a str,
- table_partition_cols: &'a [String],
+ table_partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
let list = table_path.list_all_files(store, file_extension);
@@ -187,7 +188,15 @@ pub async fn pruned_partition_list<'a>(
let applicable_filters: Vec<_> = filters
.iter()
- .filter(|f| expr_applicable_for_cols(table_partition_cols, f))
+ .filter(|f| {
+ expr_applicable_for_cols(
+ &table_partition_cols
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
+ f,
+ )
+ })
.collect();
if applicable_filters.is_empty() {
@@ -200,11 +209,26 @@ pub async fn pruned_partition_list<'a>(
let parsed_path = parse_partitions_for_path(
table_path,
&object_meta.location,
- table_partition_cols,
+ &table_partition_cols
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
)
.map(|p| {
p.iter()
- .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
+ .zip(table_partition_cols)
+ .map(|(&part_value, part_column)| {
+ ScalarValue::try_from_string(
+ part_value.to_string(),
+ &part_column.1,
+ )
+ .unwrap_or_else(|_| {
+ panic!(
+ "Failed to cast str {} to type {}",
+ part_value, part_column.1
+ )
+ })
+ })
.collect()
});
@@ -221,6 +245,7 @@ pub async fn pruned_partition_list<'a>(
let metas: Vec<_> = list.try_collect().await?;
let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
+ debug!("get mem_table: {:?}", mem_table);
// Filter the partitions using a local datafusion context
// TODO having the external context would allow us to resolve `Volatility::Stable`
@@ -245,28 +270,35 @@ pub async fn pruned_partition_list<'a>(
///
/// Note: For the last modified date, this looses precisions higher than millisecond.
fn paths_to_batch(
- table_partition_cols: &[String],
+ table_partition_cols: &[(String, DataType)],
table_path: &ListingTableUrl,
metas: &[ObjectMeta],
) -> Result<RecordBatch> {
let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024);
let mut length_builder = UInt64Builder::with_capacity(metas.len());
let mut modified_builder = Date64Builder::with_capacity(metas.len());
- let mut partition_builders = table_partition_cols
+ let mut partition_scalar_values = table_partition_cols
.iter()
- .map(|_| StringBuilder::with_capacity(metas.len(), 1024))
+ .map(|_| Vec::new())
.collect::<Vec<_>>();
for file_meta in metas {
if let Some(partition_values) = parse_partitions_for_path(
table_path,
&file_meta.location,
- table_partition_cols,
+ &table_partition_cols
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
) {
key_builder.append_value(file_meta.location.as_ref());
length_builder.append_value(file_meta.size as u64);
modified_builder.append_value(file_meta.last_modified.timestamp_millis());
for (i, part_val) in partition_values.iter().enumerate() {
- partition_builders[i].append_value(part_val);
+ let scalar_val = ScalarValue::try_from_string(
+ part_val.to_string(),
+ &table_partition_cols[i].1,
+ )?;
+ partition_scalar_values[i].push(scalar_val);
}
} else {
debug!("No partitioning for path {}", file_meta.location);
@@ -279,8 +311,13 @@ fn paths_to_batch(
ArrayBuilder::finish(&mut length_builder),
ArrayBuilder::finish(&mut modified_builder),
];
- for mut partition_builder in partition_builders {
- col_arrays.push(ArrayBuilder::finish(&mut partition_builder));
+ for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() {
+ if part_scalar_val.is_empty() {
+ col_arrays.push(new_empty_array(&table_partition_cols[i].1));
+ } else {
+ let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?;
+ col_arrays.push(partition_val_array);
+ }
}
// put the schema together
@@ -289,8 +326,8 @@ fn paths_to_batch(
Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
];
- for pn in table_partition_cols {
- fields.push(Field::new(pn, DataType::Utf8, false));
+ for part_col in table_partition_cols {
+ fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false));
}
let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?;
@@ -366,9 +403,10 @@ fn parse_partitions_for_path<'a>(
#[cfg(test)]
mod tests {
+ use futures::StreamExt;
+
use crate::logical_expr::{case, col, lit};
use crate::test::object_store::make_test_store;
- use futures::StreamExt;
use super::*;
@@ -424,7 +462,7 @@ mod tests {
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
- &[String::from("mypartition")],
+ &[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
@@ -447,7 +485,7 @@ mod tests {
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
- &[String::from("mypartition")],
+ &[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
@@ -494,7 +532,10 @@ mod tests {
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
".parquet",
- &[String::from("part1"), String::from("part2")],
+ &[
+ (String::from("part1"), DataType::Utf8),
+ (String::from("part2"), DataType::Utf8),
+ ],
)
.await
.expect("partition pruning failed")
@@ -645,7 +686,7 @@ mod tests {
];
let batches = paths_to_batch(
- &[String::from("part1")],
+ &[(String::from("part1"), DataType::Utf8)],
&ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
&files,
)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 0fc17422a..e3ee5f384 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -21,7 +21,7 @@ use std::str::FromStr;
use std::{any::Any, sync::Arc};
use arrow::compute::SortOptions;
-use arrow::datatypes::{Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_physical_expr::PhysicalSortExpr;
@@ -41,14 +41,14 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
+use crate::physical_plan::file_format::partition_type_wrap;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{
- empty::EmptyExec,
- file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
- project_schema, ExecutionPlan, Statistics,
+ empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan,
+ Statistics,
},
};
@@ -210,9 +210,7 @@ pub struct ListingOptions {
/// partitioning expected should be named "a" and "b":
/// - If there is a third level of partitioning it will be ignored.
/// - Files that don't follow this partitioning will be ignored.
- /// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
- /// supported for the column type.
- pub table_partition_cols: Vec<String>,
+ pub table_partition_cols: Vec<(String, DataType)>,
/// Set true to try to guess statistics from the files.
/// This can add a lot of overhead as it will usually require files
/// to be opened and at least partially parsed.
@@ -270,16 +268,19 @@ impl ListingOptions {
///
/// ```
/// use std::sync::Arc;
+ /// use arrow::datatypes::DataType;
/// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
///
/// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
- /// .with_table_partition_cols(vec!["col_a".to_string(), "col_b".to_string()]);
+ /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
+ /// ("col_b".to_string(), DataType::Utf8)]);
///
- /// assert_eq!(listing_options.table_partition_cols, vec!["col_a", "col_b"]);
+ /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
+ /// ("col_b".to_string(), DataType::Utf8)]);
/// ```
pub fn with_table_partition_cols(
mut self,
- table_partition_cols: Vec<String>,
+ table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
@@ -428,10 +429,10 @@ impl ListingTable {
// Add the partition columns to the file schema
let mut table_fields = file_schema.fields().clone();
- for part in &options.table_partition_cols {
+ for (part_col_name, part_col_type) in &options.table_partition_cols {
table_fields.push(Field::new(
- part,
- DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
+ part_col_name,
+ partition_type_wrap(part_col_type.clone()),
false,
));
}
@@ -536,6 +537,23 @@ impl TableProvider for ListingTable {
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}
+ // extract types of partition columns
+ let table_partition_cols = self
+ .options
+ .table_partition_cols
+ .iter()
+ .map(|col| {
+ (
+ col.0.to_owned(),
+ self.table_schema
+ .field_with_name(&col.0)
+ .unwrap()
+ .data_type()
+ .clone(),
+ )
+ })
+ .collect();
+
// create the execution plan
self.options
.format
@@ -548,7 +566,7 @@ impl TableProvider for ListingTable {
projection: projection.clone(),
limit,
output_ordering: self.try_create_output_ordering()?,
- table_partition_cols: self.options.table_partition_cols.clone(),
+ table_partition_cols,
config_options: ctx.config.config_options(),
},
filters,
@@ -560,7 +578,15 @@ impl TableProvider for ListingTable {
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
- if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
+ if expr_applicable_for_cols(
+ &self
+ .options
+ .table_partition_cols
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
+ filter,
+ ) {
// if filter can be handled by partiton pruning, it is exact
Ok(TableProviderFilterPushDown::Exact)
} else {
@@ -807,7 +833,10 @@ mod tests {
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
.with_file_extension(FileType::AVRO.get_ext())
- .with_table_partition_cols(vec![String::from("p1")])
+ .with_table_partition_cols(vec![(
+ String::from("p1"),
+ partition_type_wrap(DataType::Utf8),
+ )])
.with_target_partitions(4);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 3662ab361..dbb8fdc78 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -29,6 +29,7 @@ use crate::datasource::listing::{
};
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
+use arrow::datatypes::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_expr::CreateExternalTable;
@@ -88,17 +89,46 @@ impl TableProviderFactory for ListingTableFactory {
),
};
- let provided_schema = if cmd.schema.fields().is_empty() {
- None
+ let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
+ (
+ None,
+ cmd.table_partition_cols
+ .iter()
+ .map(|x| (x.clone(), DataType::Utf8))
+ .collect::<Vec<_>>(),
+ )
} else {
- Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
+ let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into());
+ let table_partition_cols = cmd
+ .table_partition_cols
+ .iter()
+ .map(|col| {
+ schema.field_with_name(col).map_err(|arrow_err| {
+ DataFusionError::Execution(arrow_err.to_string())
+ })
+ })
+ .collect::<datafusion_common::Result<Vec<_>>>()?
+ .into_iter()
+ .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
+ .collect();
+ // exclude partition columns to support creating partitioned external table
+ // with a specified column definition like
+ // `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...`
+ let mut project_idx = Vec::new();
+ for i in 0..schema.fields().len() {
+ if !cmd.table_partition_cols.contains(schema.field(i).name()) {
+ project_idx.push(i);
+ }
+ }
+ let schema = Arc::new(schema.project(&project_idx)?);
+ (Some(schema), table_partition_cols)
};
let options = ListingOptions::new(file_format)
.with_collect_stat(state.config.collect_statistics)
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions)
- .with_table_partition_cols(cmd.table_partition_cols.clone())
+ .with_table_partition_cols(table_partition_cols)
.with_file_sort_order(None);
let table_path = ListingTableUrl::parse(&cmd.location)?;
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 4e5238ed2..38e16fb03 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -37,6 +37,7 @@ use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
/// In-memory table
+#[derive(Debug)]
pub struct MemTable {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index e130ac84d..17f59ec7f 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -19,7 +19,7 @@
use std::sync::Arc;
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
@@ -58,8 +58,7 @@ pub struct CsvReadOptions<'a> {
/// Defaults to `FileType::CSV.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
- pub table_partition_cols: Vec<String>,
-
+ pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
}
@@ -117,7 +116,10 @@ impl<'a> CsvReadOptions<'a> {
}
/// Specify table_partition_cols for partition pruning
- pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ pub fn table_partition_cols(
+ mut self,
+ table_partition_cols: Vec<(String, DataType)>,
+ ) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
@@ -164,7 +166,7 @@ pub struct ParquetReadOptions<'a> {
/// Defaults to ".parquet".
pub file_extension: &'a str,
/// Partition Columns
- pub table_partition_cols: Vec<String>,
+ pub table_partition_cols: Vec<(String, DataType)>,
/// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
// TODO move this into ConfigOptions
@@ -205,7 +207,10 @@ impl<'a> ParquetReadOptions<'a> {
}
/// Specify table_partition_cols for partition pruning
- pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ pub fn table_partition_cols(
+ mut self,
+ table_partition_cols: Vec<(String, DataType)>,
+ ) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
@@ -238,7 +243,7 @@ pub struct AvroReadOptions<'a> {
/// Defaults to `FileType::AVRO.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
- pub table_partition_cols: Vec<String>,
+ pub table_partition_cols: Vec<(String, DataType)>,
}
impl<'a> Default for AvroReadOptions<'a> {
@@ -253,7 +258,10 @@ impl<'a> Default for AvroReadOptions<'a> {
impl<'a> AvroReadOptions<'a> {
/// Specify table_partition_cols for partition pruning
- pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ pub fn table_partition_cols(
+ mut self,
+ table_partition_cols: Vec<(String, DataType)>,
+ ) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
@@ -279,16 +287,13 @@ impl<'a> AvroReadOptions<'a> {
pub struct NdJsonReadOptions<'a> {
/// The data source schema.
pub schema: Option<SchemaRef>,
-
/// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,
-
/// File extension; only files with this extension are selected for data input.
/// Defaults to `FileType::JSON.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
- pub table_partition_cols: Vec<String>,
-
+ pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
}
@@ -307,7 +312,10 @@ impl<'a> Default for NdJsonReadOptions<'a> {
impl<'a> NdJsonReadOptions<'a> {
/// Specify table_partition_cols for partition pruning
- pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ pub fn table_partition_cols(
+ mut self,
+ table_partition_cols: Vec<(String, DataType)>,
+ ) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 59faac8fa..af4582c21 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -208,6 +208,7 @@ mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
@@ -374,7 +375,10 @@ mod tests {
file_schema,
statistics: Statistics::default(),
limit: None,
- table_partition_cols: vec!["date".to_owned()],
+ table_partition_cols: vec![(
+ "date".to_owned(),
+ partition_type_wrap(DataType::Utf8),
+ )],
config_options: ConfigOptions::new().into_shareable(),
output_ordering: None,
});
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 8b50755f4..454770349 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -287,6 +287,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::file_type::FileType;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
+ use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
@@ -488,7 +489,8 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
// Add partition columns
- config.table_partition_cols = vec!["date".to_owned()];
+ config.table_partition_cols =
+ vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 063238a6d..51a911d51 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -182,7 +182,11 @@ impl<F: FileOpener> FileStream<F> {
let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
projected_schema.clone(),
- &config.table_partition_cols,
+ &config
+ .table_partition_cols
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
);
let files = config.file_groups[partition].clone();
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index fbeab4cbf..f2ccc16bc 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -52,7 +52,6 @@ use crate::{
};
use arrow::array::{new_null_array, UInt16BufferBuilder};
use arrow::record_batch::RecordBatchOptions;
-use lazy_static::lazy_static;
use log::{debug, info};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -65,9 +64,9 @@ use std::{
use super::{ColumnStatistics, Statistics};
-lazy_static! {
- /// The datatype used for all partitioning columns for now
- pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
+/// convert logical type of partition column to physical type: Dictionary(UInt16, val_type)
+pub fn partition_type_wrap(val_type: DataType) -> DataType {
+ DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
/// The base configurations to provide when creating a physical plan for
@@ -99,8 +98,8 @@ pub struct FileScanConfig {
/// The maximum number of records to read from this plan. If None,
/// all records after filtering are returned.
pub limit: Option<usize>,
- /// The partitioning column names
- pub table_partition_cols: Vec<String>,
+ /// The partitioning columns
+ pub table_partition_cols: Vec<(String, DataType)>,
/// The order in which the data is sorted, if known.
pub output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Configuration options passed to the physical plans
@@ -134,8 +133,8 @@ impl FileScanConfig {
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(Field::new(
- &self.table_partition_cols[partition_idx],
- DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
+ &self.table_partition_cols[partition_idx].0,
+ self.table_partition_cols[partition_idx].1.to_owned(),
false,
));
// TODO provide accurate stat for partition column (#1186)
@@ -406,10 +405,7 @@ fn create_dict_array(
};
// create data type
- let data_type =
- DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val.get_datatype()));
-
- debug_assert_eq!(data_type, *DEFAULT_PARTITION_COLUMN_DATATYPE);
+ let data_type = partition_type_wrap(val.get_datatype());
// assemble pieces together
let mut builder = ArrayData::builder(data_type)
@@ -539,7 +535,7 @@ mod tests {
Arc::clone(&file_schema),
None,
Statistics::default(),
- vec!["date".to_owned()],
+ vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
);
let (proj_schema, proj_statistics) = conf.project();
@@ -585,7 +581,7 @@ mod tests {
),
..Default::default()
},
- vec!["date".to_owned()],
+ vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
);
let (proj_schema, proj_statistics) = conf.project();
@@ -615,8 +611,11 @@ mod tests {
("b", &vec![-2, -1, 0]),
("c", &vec![10, 11, 12]),
);
- let partition_cols =
- vec!["year".to_owned(), "month".to_owned(), "day".to_owned()];
+ let partition_cols = vec![
+ ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
+ ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
+ ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
+ ];
// create a projected schema
let conf = config_for_projection(
file_batch.schema(),
@@ -633,7 +632,13 @@ mod tests {
);
let (proj_schema, _) = conf.project();
// created a projector for that projected schema
- let mut proj = PartitionColumnProjector::new(proj_schema, &partition_cols);
+ let mut proj = PartitionColumnProjector::new(
+ proj_schema,
+ &partition_cols
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
+ );
// project first batch
let projected_batch = proj
@@ -777,7 +782,7 @@ mod tests {
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
- table_partition_cols: Vec<String>,
+ table_partition_cols: Vec<(String, DataType)>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index fa68f3072..cfd1c2194 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -725,6 +725,7 @@ mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::displayable;
+ use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
@@ -1441,9 +1442,9 @@ mod tests {
projection: Some(vec![0, 1, 2, 12]),
limit: None,
table_partition_cols: vec![
- "year".to_owned(),
- "month".to_owned(),
- "day".to_owned(),
+ ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
+ ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
+ ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
],
config_options: ConfigOptions::new().into_shareable(),
output_ordering: None,
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 028ddf14e..41ae75864 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -17,6 +17,7 @@
//! Test queries on partitioned datasets
+use arrow::datatypes::DataType;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
@@ -56,7 +57,11 @@ async fn parquet_distinct_partition_col() -> Result<()> {
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
- &["year", "month", "day"],
+ &[
+ ("year", DataType::Int32),
+ ("month", DataType::Utf8),
+ ("day", DataType::Utf8),
+ ],
"mirror:///",
"alltypes_plain.parquet",
)
@@ -195,7 +200,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
"mytable/date=2021-10-27/file.csv",
"mytable/date=2021-10-28/file.csv",
],
- &["date"],
+ &[("date", DataType::Utf8)],
"mirror:///mytable/",
);
@@ -221,6 +226,42 @@ async fn csv_filter_with_file_col() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn csv_filter_with_file_nonstring_col() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ register_partitioned_aggregate_csv(
+ &ctx,
+ &[
+ "mytable/date=2021-10-27/file.csv",
+ "mytable/date=2021-10-28/file.csv",
+ ],
+ &[("date", DataType::Date32)],
+ "mirror:///mytable/",
+ );
+
+ let result = ctx
+ .sql("SELECT c1, c2, date FROM t WHERE date > '2021-10-27' LIMIT 5")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+----+----+------------+",
+ "| c1 | c2 | date |",
+ "+----+----+------------+",
+ "| a | 1 | 2021-10-28 |",
+ "| b | 1 | 2021-10-28 |",
+ "| b | 5 | 2021-10-28 |",
+ "| c | 2 | 2021-10-28 |",
+ "| d | 5 | 2021-10-28 |",
+ "+----+----+------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result);
+
+ Ok(())
+}
+
#[tokio::test]
async fn csv_projection_on_partition() -> Result<()> {
let ctx = SessionContext::new();
@@ -231,7 +272,7 @@ async fn csv_projection_on_partition() -> Result<()> {
"mytable/date=2021-10-27/file.csv",
"mytable/date=2021-10-28/file.csv",
],
- &["date"],
+ &[("date", DataType::Date32)],
"mirror:///mytable/",
);
@@ -268,7 +309,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
"mytable/date=2021-10-27/file.csv",
"mytable/date=2021-10-28/file.csv",
],
- &["date"],
+ &[("date", DataType::Date32)],
"mirror:///mytable/",
);
@@ -302,7 +343,11 @@ async fn parquet_multiple_partitions() -> Result<()> {
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
- &["year", "month", "day"],
+ &[
+ ("year", DataType::Utf8),
+ ("month", DataType::Utf8),
+ ("day", DataType::Utf8),
+ ],
"mirror:///",
"alltypes_plain.parquet",
)
@@ -333,6 +378,52 @@ async fn parquet_multiple_partitions() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn parquet_multiple_nonstring_partitions() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ register_partitioned_alltypes_parquet(
+ &ctx,
+ &[
+ "year=2021/month=09/day=09/file.parquet",
+ "year=2021/month=10/day=09/file.parquet",
+ "year=2021/month=10/day=28/file.parquet",
+ ],
+ &[
+ ("year", DataType::Int32),
+ ("month", DataType::Int32),
+ ("day", DataType::Int32),
+ ],
+ "mirror:///",
+ "alltypes_plain.parquet",
+ )
+ .await;
+
+ let result = ctx
+ .sql("SELECT id, day FROM t WHERE day=month ORDER BY id")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+----+-----+",
+ "| id | day |",
+ "+----+-----+",
+ "| 0 | 9 |",
+ "| 1 | 9 |",
+ "| 2 | 9 |",
+ "| 3 | 9 |",
+ "| 4 | 9 |",
+ "| 5 | 9 |",
+ "| 6 | 9 |",
+ "| 7 | 9 |",
+ "+----+-----+",
+ ];
+ assert_batches_sorted_eq!(expected, &result);
+
+ Ok(())
+}
+
#[tokio::test]
async fn parquet_statistics() -> Result<()> {
let ctx = SessionContext::new();
@@ -344,7 +435,11 @@ async fn parquet_statistics() -> Result<()> {
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
- &["year", "month", "day"],
+ &[
+ ("year", DataType::Int32),
+ ("month", DataType::Utf8),
+ ("day", DataType::Utf8),
+ ],
"mirror:///",
// This is the only file we found in the test set with
// actual stats. It has 1 column / 1 row.
@@ -404,7 +499,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
"id=2/file.parquet",
"id=3/file.parquet",
],
- &["id"],
+ &[("id", DataType::Int64)],
"mirror:///",
"alltypes_plain.parquet",
)
@@ -422,7 +517,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
fn register_partitioned_aggregate_csv(
ctx: &SessionContext,
store_paths: &[&str],
- partition_cols: &[&str],
+ partition_cols: &[(&str, DataType)],
table_path: &str,
) {
let testdata = arrow_test_data();
@@ -436,7 +531,10 @@ fn register_partitioned_aggregate_csv(
let options = ListingOptions::new(Arc::new(CsvFormat::default()))
.with_table_partition_cols(
- partition_cols.iter().map(|&s| s.to_owned()).collect(),
+ partition_cols
+ .iter()
+ .map(|x| (x.0.to_owned(), x.1.clone()))
+ .collect::<Vec<_>>(),
);
let table_path = ListingTableUrl::parse(table_path).unwrap();
@@ -452,7 +550,7 @@ fn register_partitioned_aggregate_csv(
async fn register_partitioned_alltypes_parquet(
ctx: &SessionContext,
store_paths: &[&str],
- partition_cols: &[&str],
+ partition_cols: &[(&str, DataType)],
table_path: &str,
source_file: &str,
) {
@@ -466,7 +564,10 @@ async fn register_partitioned_alltypes_parquet(
let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_table_partition_cols(
- partition_cols.iter().map(|&s| s.to_owned()).collect(),
+ partition_cols
+ .iter()
+ .map(|x| (x.0.to_owned(), x.1.clone()))
+ .collect::<Vec<_>>(),
);
let table_path = ListingTableUrl::parse(table_path).unwrap();
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 452f41b7d..fa2cbc00d 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::fs::create_dir_all;
use std::io::Write;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -341,6 +342,73 @@ async fn create_external_table_with_timestamps() {
assert_batches_sorted_eq!(expected, &result);
}
+#[tokio::test]
+async fn create_partitioned_external_table() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let data0 = "Jorge,2018-12-13T12:12:10.011Z";
+ let data1 = "Andrew,2018-11-13T17:11:10.011Z";
+
+ let tmp_dir = TempDir::new().unwrap();
+
+ // scope to ensure the file is closed and written
+ {
+ let part0 = tmp_dir.path().join("c_date=2018-12-13");
+ let file_path0 = part0.join("timestamps.csv");
+ create_dir_all(part0).expect("creating part dir");
+ File::options()
+ .read(true)
+ .write(true)
+ .create(true)
+ .open(&file_path0)
+ .expect("creating temp file")
+ .write_all(data0.as_bytes())
+ .expect("writing data");
+
+ let part1 = tmp_dir.path().join("c_date=2018-11-13");
+ let file_path1 = part1.join("timestamps.csv");
+ create_dir_all(part1).expect("creating part dir");
+ File::options()
+ .read(true)
+ .write(true)
+ .create(true)
+ .open(&file_path1)
+ .expect("creating temp file")
+ .write_all(data1.as_bytes())
+ .expect("writing data");
+ }
+
+ let sql = format!(
+ "CREATE EXTERNAL TABLE csv_with_timestamps (
+ name VARCHAR,
+ ts TIMESTAMP,
+ c_date DATE,
+ )
+ STORED AS CSV
+ PARTITIONED BY (c_date)
+ LOCATION '{}'
+ ",
+ tmp_dir.path().to_str().expect("path is utf8")
+ );
+
+ plan_and_collect(&ctx, &sql)
+ .await
+ .expect("Executing CREATE EXTERNAL TABLE");
+
+ let sql = "SELECT * from csv_with_timestamps where c_date='2018-11-13'";
+ let result = plan_and_collect(&ctx, sql).await.unwrap();
+ let expected = vec![
+ "+--------+-------------------------+------------+",
+ "| name | ts | c_date |",
+ "+--------+-------------------------+------------+",
+ "| Andrew | 2018-11-13T17:11:10.011 | 2018-11-13 |",
+ "+--------+-------------------------+------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result);
+
+ Ok(())
+}
+
#[tokio::test]
#[should_panic(expected = "already exists")]
async fn sql_create_duplicate_table() {
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 499e92911..42d3401ef 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -461,7 +461,21 @@ impl AsLogicalPlan for LogicalPlanNode {
let options = ListingOptions::new(file_format)
.with_file_extension(scan.file_extension.clone())
- .with_table_partition_cols(scan.table_partition_cols.clone())
+ .with_table_partition_cols(
+ scan.table_partition_cols
+ .iter()
+ .map(|col| {
+ (
+ col.clone(),
+ schema
+ .field_with_name(col)
+ .unwrap()
+ .data_type()
+ .clone(),
+ )
+ })
+ .collect(),
+ )
.with_collect_stat(scan.collect_stat)
.with_target_partitions(scan.target_partitions as usize)
.with_file_sort_order(file_sort_order);
@@ -876,7 +890,7 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Avro(protobuf::AvroFormat {})
} else {
return Err(proto_error(format!(
- "Error converting file format, {:?} is invalid as a datafusion foramt.",
+ "Error converting file format, {:?} is invalid as a datafusion format.",
listing_table.options().format
)));
};
@@ -901,7 +915,9 @@ impl AsLogicalPlan for LogicalPlanNode {
file_extension: options.file_extension.clone(),
table_partition_cols: options
.table_partition_cols
- .clone(),
+ .iter()
+ .map(|x| x.0.clone())
+ .collect::<Vec<_>>(),
paths: listing_table
.table_paths()
.iter()