You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/23 17:58:40 UTC

[arrow-datafusion] branch master updated: Support types other than String for partition columns on ListingTables (#4221)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 55bf8e9c2 Support types other than String for partition columns on ListingTables (#4221)
55bf8e9c2 is described below

commit 55bf8e9c2ca6d358e7dc4c37e1439de38008f960
Author: Jie Han <11...@users.noreply.github.com>
AuthorDate: Thu Nov 24 01:58:33 2022 +0800

    Support types other than String for partition columns on ListingTables (#4221)
    
    * partition columns config
    
    * solve test compile problems
    
    * fix ut
    
    * add partition column types config for each file reader options
    
    * get correct partition column types from partitioned files
    
    * remove DEFAULT_PARTITION_COLUMN_TYPE
    
    * update pruned_partition_list
    
    * change api
    
    * add some tests
    
    * create partitioned external table with schema
    
    * upd
    
    * Update datafusion/core/src/datasource/listing_table_factory.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * code clean
    
    * code clean
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/src/datasource/listing/helpers.rs  |  77 ++++++++++---
 datafusion/core/src/datasource/listing/table.rs    |  61 +++++++---
 .../core/src/datasource/listing_table_factory.rs   |  38 ++++++-
 datafusion/core/src/datasource/memory.rs           |   1 +
 datafusion/core/src/execution/options.rs           |  34 +++---
 .../core/src/physical_plan/file_format/avro.rs     |   6 +-
 .../core/src/physical_plan/file_format/csv.rs      |   4 +-
 .../src/physical_plan/file_format/file_stream.rs   |   6 +-
 .../core/src/physical_plan/file_format/mod.rs      |  41 ++++---
 .../core/src/physical_plan/file_format/parquet.rs  |   7 +-
 datafusion/core/tests/path_partition.rs            | 123 +++++++++++++++++++--
 datafusion/core/tests/sql/create_drop.rs           |  68 ++++++++++++
 datafusion/proto/src/logical_plan.rs               |  22 +++-
 13 files changed, 399 insertions(+), 89 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index f1a34e665..3cfe9ec14 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -19,6 +19,7 @@
 
 use std::sync::Arc;
 
+use arrow::array::new_empty_array;
 use arrow::{
     array::{
         Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder,
@@ -176,7 +177,7 @@ pub async fn pruned_partition_list<'a>(
     table_path: &'a ListingTableUrl,
     filters: &'a [Expr],
     file_extension: &'a str,
-    table_partition_cols: &'a [String],
+    table_partition_cols: &'a [(String, DataType)],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
     let list = table_path.list_all_files(store, file_extension);
 
@@ -187,7 +188,15 @@ pub async fn pruned_partition_list<'a>(
 
     let applicable_filters: Vec<_> = filters
         .iter()
-        .filter(|f| expr_applicable_for_cols(table_partition_cols, f))
+        .filter(|f| {
+            expr_applicable_for_cols(
+                &table_partition_cols
+                    .iter()
+                    .map(|x| x.0.clone())
+                    .collect::<Vec<_>>(),
+                f,
+            )
+        })
         .collect();
 
     if applicable_filters.is_empty() {
@@ -200,11 +209,26 @@ pub async fn pruned_partition_list<'a>(
                 let parsed_path = parse_partitions_for_path(
                     table_path,
                     &object_meta.location,
-                    table_partition_cols,
+                    &table_partition_cols
+                        .iter()
+                        .map(|x| x.0.clone())
+                        .collect::<Vec<_>>(),
                 )
                 .map(|p| {
                     p.iter()
-                        .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
+                        .zip(table_partition_cols)
+                        .map(|(&part_value, part_column)| {
+                            ScalarValue::try_from_string(
+                                part_value.to_string(),
+                                &part_column.1,
+                            )
+                            .unwrap_or_else(|_| {
+                                panic!(
+                                    "Failed to cast str {} to type {}",
+                                    part_value, part_column.1
+                                )
+                            })
+                        })
                         .collect()
                 });
 
@@ -221,6 +245,7 @@ pub async fn pruned_partition_list<'a>(
         let metas: Vec<_> = list.try_collect().await?;
         let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
         let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
+        debug!("get mem_table: {:?}", mem_table);
 
         // Filter the partitions using a local datafusion context
         // TODO having the external context would allow us to resolve `Volatility::Stable`
@@ -245,28 +270,35 @@ pub async fn pruned_partition_list<'a>(
 ///
 /// Note: For the last modified date, this looses precisions higher than millisecond.
 fn paths_to_batch(
-    table_partition_cols: &[String],
+    table_partition_cols: &[(String, DataType)],
     table_path: &ListingTableUrl,
     metas: &[ObjectMeta],
 ) -> Result<RecordBatch> {
     let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024);
     let mut length_builder = UInt64Builder::with_capacity(metas.len());
     let mut modified_builder = Date64Builder::with_capacity(metas.len());
-    let mut partition_builders = table_partition_cols
+    let mut partition_scalar_values = table_partition_cols
         .iter()
-        .map(|_| StringBuilder::with_capacity(metas.len(), 1024))
+        .map(|_| Vec::new())
         .collect::<Vec<_>>();
     for file_meta in metas {
         if let Some(partition_values) = parse_partitions_for_path(
             table_path,
             &file_meta.location,
-            table_partition_cols,
+            &table_partition_cols
+                .iter()
+                .map(|x| x.0.clone())
+                .collect::<Vec<_>>(),
         ) {
             key_builder.append_value(file_meta.location.as_ref());
             length_builder.append_value(file_meta.size as u64);
             modified_builder.append_value(file_meta.last_modified.timestamp_millis());
             for (i, part_val) in partition_values.iter().enumerate() {
-                partition_builders[i].append_value(part_val);
+                let scalar_val = ScalarValue::try_from_string(
+                    part_val.to_string(),
+                    &table_partition_cols[i].1,
+                )?;
+                partition_scalar_values[i].push(scalar_val);
             }
         } else {
             debug!("No partitioning for path {}", file_meta.location);
@@ -279,8 +311,13 @@ fn paths_to_batch(
         ArrayBuilder::finish(&mut length_builder),
         ArrayBuilder::finish(&mut modified_builder),
     ];
-    for mut partition_builder in partition_builders {
-        col_arrays.push(ArrayBuilder::finish(&mut partition_builder));
+    for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() {
+        if part_scalar_val.is_empty() {
+            col_arrays.push(new_empty_array(&table_partition_cols[i].1));
+        } else {
+            let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?;
+            col_arrays.push(partition_val_array);
+        }
     }
 
     // put the schema together
@@ -289,8 +326,8 @@ fn paths_to_batch(
         Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
         Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
     ];
-    for pn in table_partition_cols {
-        fields.push(Field::new(pn, DataType::Utf8, false));
+    for part_col in table_partition_cols {
+        fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false));
     }
 
     let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?;
@@ -366,9 +403,10 @@ fn parse_partitions_for_path<'a>(
 
 #[cfg(test)]
 mod tests {
+    use futures::StreamExt;
+
     use crate::logical_expr::{case, col, lit};
     use crate::test::object_store::make_test_store;
-    use futures::StreamExt;
 
     use super::*;
 
@@ -424,7 +462,7 @@ mod tests {
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
             ".parquet",
-            &[String::from("mypartition")],
+            &[(String::from("mypartition"), DataType::Utf8)],
         )
         .await
         .expect("partition pruning failed")
@@ -447,7 +485,7 @@ mod tests {
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
             ".parquet",
-            &[String::from("mypartition")],
+            &[(String::from("mypartition"), DataType::Utf8)],
         )
         .await
         .expect("partition pruning failed")
@@ -494,7 +532,10 @@ mod tests {
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter1, filter2, filter3],
             ".parquet",
-            &[String::from("part1"), String::from("part2")],
+            &[
+                (String::from("part1"), DataType::Utf8),
+                (String::from("part2"), DataType::Utf8),
+            ],
         )
         .await
         .expect("partition pruning failed")
@@ -645,7 +686,7 @@ mod tests {
         ];
 
         let batches = paths_to_batch(
-            &[String::from("part1")],
+            &[(String::from("part1"), DataType::Utf8)],
             &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
             &files,
         )
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 0fc17422a..e3ee5f384 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -21,7 +21,7 @@ use std::str::FromStr;
 use std::{any::Any, sync::Arc};
 
 use arrow::compute::SortOptions;
-use arrow::datatypes::{Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use async_trait::async_trait;
 use dashmap::DashMap;
 use datafusion_physical_expr::PhysicalSortExpr;
@@ -41,14 +41,14 @@ use crate::datasource::{
 };
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::physical_plan;
+use crate::physical_plan::file_format::partition_type_wrap;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
     logical_expr::Expr,
     physical_plan::{
-        empty::EmptyExec,
-        file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
-        project_schema, ExecutionPlan, Statistics,
+        empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan,
+        Statistics,
     },
 };
 
@@ -210,9 +210,7 @@ pub struct ListingOptions {
     /// partitioning expected should be named "a" and "b":
     /// - If there is a third level of partitioning it will be ignored.
     /// - Files that don't follow this partitioning will be ignored.
-    /// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
-    /// supported for the column type.
-    pub table_partition_cols: Vec<String>,
+    pub table_partition_cols: Vec<(String, DataType)>,
     /// Set true to try to guess statistics from the files.
     /// This can add a lot of overhead as it will usually require files
     /// to be opened and at least partially parsed.
@@ -270,16 +268,19 @@ impl ListingOptions {
     ///
     /// ```
     /// use std::sync::Arc;
+    /// use arrow::datatypes::DataType;
     /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
     ///
     /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_table_partition_cols(vec!["col_a".to_string(), "col_b".to_string()]);
+    ///     .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
+    ///         ("col_b".to_string(), DataType::Utf8)]);
     ///
-    /// assert_eq!(listing_options.table_partition_cols, vec!["col_a", "col_b"]);
+    /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
+    ///     ("col_b".to_string(), DataType::Utf8)]);
     /// ```
     pub fn with_table_partition_cols(
         mut self,
-        table_partition_cols: Vec<String>,
+        table_partition_cols: Vec<(String, DataType)>,
     ) -> Self {
         self.table_partition_cols = table_partition_cols;
         self
@@ -428,10 +429,10 @@ impl ListingTable {
 
         // Add the partition columns to the file schema
         let mut table_fields = file_schema.fields().clone();
-        for part in &options.table_partition_cols {
+        for (part_col_name, part_col_type) in &options.table_partition_cols {
             table_fields.push(Field::new(
-                part,
-                DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
+                part_col_name,
+                partition_type_wrap(part_col_type.clone()),
                 false,
             ));
         }
@@ -536,6 +537,23 @@ impl TableProvider for ListingTable {
             return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
         }
 
+        // extract types of partition columns
+        let table_partition_cols = self
+            .options
+            .table_partition_cols
+            .iter()
+            .map(|col| {
+                (
+                    col.0.to_owned(),
+                    self.table_schema
+                        .field_with_name(&col.0)
+                        .unwrap()
+                        .data_type()
+                        .clone(),
+                )
+            })
+            .collect();
+
         // create the execution plan
         self.options
             .format
@@ -548,7 +566,7 @@ impl TableProvider for ListingTable {
                     projection: projection.clone(),
                     limit,
                     output_ordering: self.try_create_output_ordering()?,
-                    table_partition_cols: self.options.table_partition_cols.clone(),
+                    table_partition_cols,
                     config_options: ctx.config.config_options(),
                 },
                 filters,
@@ -560,7 +578,15 @@ impl TableProvider for ListingTable {
         &self,
         filter: &Expr,
     ) -> Result<TableProviderFilterPushDown> {
-        if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
+        if expr_applicable_for_cols(
+            &self
+                .options
+                .table_partition_cols
+                .iter()
+                .map(|x| x.0.clone())
+                .collect::<Vec<_>>(),
+            filter,
+        ) {
             // if filter can be handled by partiton pruning, it is exact
             Ok(TableProviderFilterPushDown::Exact)
         } else {
@@ -807,7 +833,10 @@ mod tests {
 
         let opt = ListingOptions::new(Arc::new(AvroFormat {}))
             .with_file_extension(FileType::AVRO.get_ext())
-            .with_table_partition_cols(vec![String::from("p1")])
+            .with_table_partition_cols(vec![(
+                String::from("p1"),
+                partition_type_wrap(DataType::Utf8),
+            )])
             .with_target_partitions(4);
 
         let table_path = ListingTableUrl::parse("test:///table/").unwrap();
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 3662ab361..dbb8fdc78 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -29,6 +29,7 @@ use crate::datasource::listing::{
 };
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
+use arrow::datatypes::{DataType, SchemaRef};
 use async_trait::async_trait;
 use datafusion_common::DataFusionError;
 use datafusion_expr::CreateExternalTable;
@@ -88,17 +89,46 @@ impl TableProviderFactory for ListingTableFactory {
             ),
         };
 
-        let provided_schema = if cmd.schema.fields().is_empty() {
-            None
+        let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
+            (
+                None,
+                cmd.table_partition_cols
+                    .iter()
+                    .map(|x| (x.clone(), DataType::Utf8))
+                    .collect::<Vec<_>>(),
+            )
         } else {
-            Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
+            let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into());
+            let table_partition_cols = cmd
+                .table_partition_cols
+                .iter()
+                .map(|col| {
+                    schema.field_with_name(col).map_err(|arrow_err| {
+                        DataFusionError::Execution(arrow_err.to_string())
+                    })
+                })
+                .collect::<datafusion_common::Result<Vec<_>>>()?
+                .into_iter()
+                .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
+                .collect();
+            // exclude partition columns to support creating partitioned external table
+            // with a specified column definition like
+            // `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...`
+            let mut project_idx = Vec::new();
+            for i in 0..schema.fields().len() {
+                if !cmd.table_partition_cols.contains(schema.field(i).name()) {
+                    project_idx.push(i);
+                }
+            }
+            let schema = Arc::new(schema.project(&project_idx)?);
+            (Some(schema), table_partition_cols)
         };
 
         let options = ListingOptions::new(file_format)
             .with_collect_stat(state.config.collect_statistics)
             .with_file_extension(file_extension)
             .with_target_partitions(state.config.target_partitions)
-            .with_table_partition_cols(cmd.table_partition_cols.clone())
+            .with_table_partition_cols(table_partition_cols)
             .with_file_sort_order(None);
 
         let table_path = ListingTableUrl::parse(&cmd.location)?;
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 4e5238ed2..38e16fb03 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -37,6 +37,7 @@ use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
 
 /// In-memory table
+#[derive(Debug)]
 pub struct MemTable {
     schema: SchemaRef,
     batches: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index e130ac84d..17f59ec7f 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -19,7 +19,7 @@
 
 use std::sync::Arc;
 
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
 
 use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
 use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
@@ -58,8 +58,7 @@ pub struct CsvReadOptions<'a> {
     /// Defaults to `FileType::CSV.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
-    pub table_partition_cols: Vec<String>,
-
+    pub table_partition_cols: Vec<(String, DataType)>,
     /// File compression type
     pub file_compression_type: FileCompressionType,
 }
@@ -117,7 +116,10 @@ impl<'a> CsvReadOptions<'a> {
     }
 
     /// Specify table_partition_cols for partition pruning
-    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+    pub fn table_partition_cols(
+        mut self,
+        table_partition_cols: Vec<(String, DataType)>,
+    ) -> Self {
         self.table_partition_cols = table_partition_cols;
         self
     }
@@ -164,7 +166,7 @@ pub struct ParquetReadOptions<'a> {
     /// Defaults to ".parquet".
     pub file_extension: &'a str,
     /// Partition Columns
-    pub table_partition_cols: Vec<String>,
+    pub table_partition_cols: Vec<(String, DataType)>,
     /// Should DataFusion parquet reader use the predicate to prune data,
     /// overridden by value on execution::context::SessionConfig
     // TODO move this into ConfigOptions
@@ -205,7 +207,10 @@ impl<'a> ParquetReadOptions<'a> {
     }
 
     /// Specify table_partition_cols for partition pruning
-    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+    pub fn table_partition_cols(
+        mut self,
+        table_partition_cols: Vec<(String, DataType)>,
+    ) -> Self {
         self.table_partition_cols = table_partition_cols;
         self
     }
@@ -238,7 +243,7 @@ pub struct AvroReadOptions<'a> {
     /// Defaults to `FileType::AVRO.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
-    pub table_partition_cols: Vec<String>,
+    pub table_partition_cols: Vec<(String, DataType)>,
 }
 
 impl<'a> Default for AvroReadOptions<'a> {
@@ -253,7 +258,10 @@ impl<'a> Default for AvroReadOptions<'a> {
 
 impl<'a> AvroReadOptions<'a> {
     /// Specify table_partition_cols for partition pruning
-    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+    pub fn table_partition_cols(
+        mut self,
+        table_partition_cols: Vec<(String, DataType)>,
+    ) -> Self {
         self.table_partition_cols = table_partition_cols;
         self
     }
@@ -279,16 +287,13 @@ impl<'a> AvroReadOptions<'a> {
 pub struct NdJsonReadOptions<'a> {
     /// The data source schema.
     pub schema: Option<SchemaRef>,
-
     /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
     pub schema_infer_max_records: usize,
-
     /// File extension; only files with this extension are selected for data input.
     /// Defaults to `FileType::JSON.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
-    pub table_partition_cols: Vec<String>,
-
+    pub table_partition_cols: Vec<(String, DataType)>,
     /// File compression type
     pub file_compression_type: FileCompressionType,
 }
@@ -307,7 +312,10 @@ impl<'a> Default for NdJsonReadOptions<'a> {
 
 impl<'a> NdJsonReadOptions<'a> {
     /// Specify table_partition_cols for partition pruning
-    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+    pub fn table_partition_cols(
+        mut self,
+        table_partition_cols: Vec<(String, DataType)>,
+    ) -> Self {
         self.table_partition_cols = table_partition_cols;
         self
     }
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 59faac8fa..af4582c21 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -208,6 +208,7 @@ mod tests {
     use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::physical_plan::file_format::partition_type_wrap;
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
     use crate::test::object_store::local_unpartitioned_file;
@@ -374,7 +375,10 @@ mod tests {
             file_schema,
             statistics: Statistics::default(),
             limit: None,
-            table_partition_cols: vec!["date".to_owned()],
+            table_partition_cols: vec![(
+                "date".to_owned(),
+                partition_type_wrap(DataType::Utf8),
+            )],
             config_options: ConfigOptions::new().into_shareable(),
             output_ordering: None,
         });
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 8b50755f4..454770349 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -287,6 +287,7 @@ mod tests {
     use super::*;
     use crate::datasource::file_format::file_type::FileType;
     use crate::physical_plan::file_format::chunked_store::ChunkedStore;
+    use crate::physical_plan::file_format::partition_type_wrap;
     use crate::prelude::*;
     use crate::test::{partitioned_csv_config, partitioned_file_groups};
     use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
@@ -488,7 +489,8 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
 
         // Add partition columns
-        config.table_partition_cols = vec!["date".to_owned()];
+        config.table_partition_cols =
+            vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))];
         config.file_groups[0][0].partition_values =
             vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
 
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 063238a6d..51a911d51 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -182,7 +182,11 @@ impl<F: FileOpener> FileStream<F> {
         let (projected_schema, _) = config.project();
         let pc_projector = PartitionColumnProjector::new(
             projected_schema.clone(),
-            &config.table_partition_cols,
+            &config
+                .table_partition_cols
+                .iter()
+                .map(|x| x.0.clone())
+                .collect::<Vec<_>>(),
         );
 
         let files = config.file_groups[partition].clone();
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index fbeab4cbf..f2ccc16bc 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -52,7 +52,6 @@ use crate::{
 };
 use arrow::array::{new_null_array, UInt16BufferBuilder};
 use arrow::record_batch::RecordBatchOptions;
-use lazy_static::lazy_static;
 use log::{debug, info};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -65,9 +64,9 @@ use std::{
 
 use super::{ColumnStatistics, Statistics};
 
-lazy_static! {
-    /// The datatype used for all partitioning columns for now
-    pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
+/// convert logical type of partition column to physical type: Dictionary(UInt16, val_type)
+pub fn partition_type_wrap(val_type: DataType) -> DataType {
+    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
 }
 
 /// The base configurations to provide when creating a physical plan for
@@ -99,8 +98,8 @@ pub struct FileScanConfig {
     /// The maximum number of records to read from this plan. If None,
     /// all records after filtering are returned.
     pub limit: Option<usize>,
-    /// The partitioning column names
-    pub table_partition_cols: Vec<String>,
+    /// The partitioning columns
+    pub table_partition_cols: Vec<(String, DataType)>,
     /// The order in which the data is sorted, if known.
     pub output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Configuration options passed to the physical plans
@@ -134,8 +133,8 @@ impl FileScanConfig {
             } else {
                 let partition_idx = idx - self.file_schema.fields().len();
                 table_fields.push(Field::new(
-                    &self.table_partition_cols[partition_idx],
-                    DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
+                    &self.table_partition_cols[partition_idx].0,
+                    self.table_partition_cols[partition_idx].1.to_owned(),
                     false,
                 ));
                 // TODO provide accurate stat for partition column (#1186)
@@ -406,10 +405,7 @@ fn create_dict_array(
     };
 
     // create data type
-    let data_type =
-        DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val.get_datatype()));
-
-    debug_assert_eq!(data_type, *DEFAULT_PARTITION_COLUMN_DATATYPE);
+    let data_type = partition_type_wrap(val.get_datatype());
 
     // assemble pieces together
     let mut builder = ArrayData::builder(data_type)
@@ -539,7 +535,7 @@ mod tests {
             Arc::clone(&file_schema),
             None,
             Statistics::default(),
-            vec!["date".to_owned()],
+            vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
         );
 
         let (proj_schema, proj_statistics) = conf.project();
@@ -585,7 +581,7 @@ mod tests {
                 ),
                 ..Default::default()
             },
-            vec!["date".to_owned()],
+            vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
         );
 
         let (proj_schema, proj_statistics) = conf.project();
@@ -615,8 +611,11 @@ mod tests {
             ("b", &vec![-2, -1, 0]),
             ("c", &vec![10, 11, 12]),
         );
-        let partition_cols =
-            vec!["year".to_owned(), "month".to_owned(), "day".to_owned()];
+        let partition_cols = vec![
+            ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
+            ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
+            ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
+        ];
         // create a projected schema
         let conf = config_for_projection(
             file_batch.schema(),
@@ -633,7 +632,13 @@ mod tests {
         );
         let (proj_schema, _) = conf.project();
         // created a projector for that projected schema
-        let mut proj = PartitionColumnProjector::new(proj_schema, &partition_cols);
+        let mut proj = PartitionColumnProjector::new(
+            proj_schema,
+            &partition_cols
+                .iter()
+                .map(|x| x.0.clone())
+                .collect::<Vec<_>>(),
+        );
 
         // project first batch
         let projected_batch = proj
@@ -777,7 +782,7 @@ mod tests {
         file_schema: SchemaRef,
         projection: Option<Vec<usize>>,
         statistics: Statistics,
-        table_partition_cols: Vec<String>,
+        table_partition_cols: Vec<(String, DataType)>,
     ) -> FileScanConfig {
         FileScanConfig {
             file_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index fa68f3072..cfd1c2194 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -725,6 +725,7 @@ mod tests {
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::options::CsvReadOptions;
     use crate::physical_plan::displayable;
+    use crate::physical_plan::file_format::partition_type_wrap;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
     use crate::{
@@ -1441,9 +1442,9 @@ mod tests {
                 projection: Some(vec![0, 1, 2, 12]),
                 limit: None,
                 table_partition_cols: vec![
-                    "year".to_owned(),
-                    "month".to_owned(),
-                    "day".to_owned(),
+                    ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
+                    ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
+                    ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
                 ],
                 config_options: ConfigOptions::new().into_shareable(),
                 output_ordering: None,
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 028ddf14e..41ae75864 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -17,6 +17,7 @@
 
 //! Test queries on partitioned datasets
 
+use arrow::datatypes::DataType;
 use std::fs::File;
 use std::io::{Read, Seek, SeekFrom};
 use std::ops::Range;
@@ -56,7 +57,11 @@ async fn parquet_distinct_partition_col() -> Result<()> {
             "year=2021/month=10/day=09/file.parquet",
             "year=2021/month=10/day=28/file.parquet",
         ],
-        &["year", "month", "day"],
+        &[
+            ("year", DataType::Int32),
+            ("month", DataType::Utf8),
+            ("day", DataType::Utf8),
+        ],
         "mirror:///",
         "alltypes_plain.parquet",
     )
@@ -195,7 +200,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
             "mytable/date=2021-10-27/file.csv",
             "mytable/date=2021-10-28/file.csv",
         ],
-        &["date"],
+        &[("date", DataType::Utf8)],
         "mirror:///mytable/",
     );
 
@@ -221,6 +226,42 @@ async fn csv_filter_with_file_col() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn csv_filter_with_file_nonstring_col() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    register_partitioned_aggregate_csv(
+        &ctx,
+        &[
+            "mytable/date=2021-10-27/file.csv",
+            "mytable/date=2021-10-28/file.csv",
+        ],
+        &[("date", DataType::Date32)],
+        "mirror:///mytable/",
+    );
+
+    let result = ctx
+        .sql("SELECT c1, c2, date FROM t WHERE date > '2021-10-27' LIMIT 5")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+----+----+------------+",
+        "| c1 | c2 | date       |",
+        "+----+----+------------+",
+        "| a  | 1  | 2021-10-28 |",
+        "| b  | 1  | 2021-10-28 |",
+        "| b  | 5  | 2021-10-28 |",
+        "| c  | 2  | 2021-10-28 |",
+        "| d  | 5  | 2021-10-28 |",
+        "+----+----+------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn csv_projection_on_partition() -> Result<()> {
     let ctx = SessionContext::new();
@@ -231,7 +272,7 @@ async fn csv_projection_on_partition() -> Result<()> {
             "mytable/date=2021-10-27/file.csv",
             "mytable/date=2021-10-28/file.csv",
         ],
-        &["date"],
+        &[("date", DataType::Date32)],
         "mirror:///mytable/",
     );
 
@@ -268,7 +309,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
             "mytable/date=2021-10-27/file.csv",
             "mytable/date=2021-10-28/file.csv",
         ],
-        &["date"],
+        &[("date", DataType::Date32)],
         "mirror:///mytable/",
     );
 
@@ -302,7 +343,11 @@ async fn parquet_multiple_partitions() -> Result<()> {
             "year=2021/month=10/day=09/file.parquet",
             "year=2021/month=10/day=28/file.parquet",
         ],
-        &["year", "month", "day"],
+        &[
+            ("year", DataType::Utf8),
+            ("month", DataType::Utf8),
+            ("day", DataType::Utf8),
+        ],
         "mirror:///",
         "alltypes_plain.parquet",
     )
@@ -333,6 +378,52 @@ async fn parquet_multiple_partitions() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn parquet_multiple_nonstring_partitions() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    register_partitioned_alltypes_parquet(
+        &ctx,
+        &[
+            "year=2021/month=09/day=09/file.parquet",
+            "year=2021/month=10/day=09/file.parquet",
+            "year=2021/month=10/day=28/file.parquet",
+        ],
+        &[
+            ("year", DataType::Int32),
+            ("month", DataType::Int32),
+            ("day", DataType::Int32),
+        ],
+        "mirror:///",
+        "alltypes_plain.parquet",
+    )
+    .await;
+
+    let result = ctx
+        .sql("SELECT id, day FROM t WHERE day=month ORDER BY id")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+----+-----+",
+        "| id | day |",
+        "+----+-----+",
+        "| 0  | 9   |",
+        "| 1  | 9   |",
+        "| 2  | 9   |",
+        "| 3  | 9   |",
+        "| 4  | 9   |",
+        "| 5  | 9   |",
+        "| 6  | 9   |",
+        "| 7  | 9   |",
+        "+----+-----+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn parquet_statistics() -> Result<()> {
     let ctx = SessionContext::new();
@@ -344,7 +435,11 @@ async fn parquet_statistics() -> Result<()> {
             "year=2021/month=10/day=09/file.parquet",
             "year=2021/month=10/day=28/file.parquet",
         ],
-        &["year", "month", "day"],
+        &[
+            ("year", DataType::Int32),
+            ("month", DataType::Utf8),
+            ("day", DataType::Utf8),
+        ],
         "mirror:///",
         // This is the only file we found in the test set with
         // actual stats. It has 1 column / 1 row.
@@ -404,7 +499,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
             "id=2/file.parquet",
             "id=3/file.parquet",
         ],
-        &["id"],
+        &[("id", DataType::Int64)],
         "mirror:///",
         "alltypes_plain.parquet",
     )
@@ -422,7 +517,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
 fn register_partitioned_aggregate_csv(
     ctx: &SessionContext,
     store_paths: &[&str],
-    partition_cols: &[&str],
+    partition_cols: &[(&str, DataType)],
     table_path: &str,
 ) {
     let testdata = arrow_test_data();
@@ -436,7 +531,10 @@ fn register_partitioned_aggregate_csv(
 
     let options = ListingOptions::new(Arc::new(CsvFormat::default()))
         .with_table_partition_cols(
-            partition_cols.iter().map(|&s| s.to_owned()).collect(),
+            partition_cols
+                .iter()
+                .map(|x| (x.0.to_owned(), x.1.clone()))
+                .collect::<Vec<_>>(),
         );
 
     let table_path = ListingTableUrl::parse(table_path).unwrap();
@@ -452,7 +550,7 @@ fn register_partitioned_aggregate_csv(
 async fn register_partitioned_alltypes_parquet(
     ctx: &SessionContext,
     store_paths: &[&str],
-    partition_cols: &[&str],
+    partition_cols: &[(&str, DataType)],
     table_path: &str,
     source_file: &str,
 ) {
@@ -466,7 +564,10 @@ async fn register_partitioned_alltypes_parquet(
 
     let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
         .with_table_partition_cols(
-            partition_cols.iter().map(|&s| s.to_owned()).collect(),
+            partition_cols
+                .iter()
+                .map(|x| (x.0.to_owned(), x.1.clone()))
+                .collect::<Vec<_>>(),
         );
 
     let table_path = ListingTableUrl::parse(table_path).unwrap();
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 452f41b7d..fa2cbc00d 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fs::create_dir_all;
 use std::io::Write;
 
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -341,6 +342,73 @@ async fn create_external_table_with_timestamps() {
     assert_batches_sorted_eq!(expected, &result);
 }
 
+#[tokio::test]
+async fn create_partitioned_external_table() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let data0 = "Jorge,2018-12-13T12:12:10.011Z";
+    let data1 = "Andrew,2018-11-13T17:11:10.011Z";
+
+    let tmp_dir = TempDir::new().unwrap();
+
+    // scope to ensure the file is closed and written
+    {
+        let part0 = tmp_dir.path().join("c_date=2018-12-13");
+        let file_path0 = part0.join("timestamps.csv");
+        create_dir_all(part0).expect("creating part dir");
+        File::options()
+            .read(true)
+            .write(true)
+            .create(true)
+            .open(&file_path0)
+            .expect("creating temp file")
+            .write_all(data0.as_bytes())
+            .expect("writing data");
+
+        let part1 = tmp_dir.path().join("c_date=2018-11-13");
+        let file_path1 = part1.join("timestamps.csv");
+        create_dir_all(part1).expect("creating part dir");
+        File::options()
+            .read(true)
+            .write(true)
+            .create(true)
+            .open(&file_path1)
+            .expect("creating temp file")
+            .write_all(data1.as_bytes())
+            .expect("writing data");
+    }
+
+    let sql = format!(
+        "CREATE EXTERNAL TABLE csv_with_timestamps (
+                  name VARCHAR,
+                  ts TIMESTAMP,
+                  c_date DATE,
+              )
+              STORED AS CSV
+              PARTITIONED BY (c_date)
+              LOCATION '{}'
+              ",
+        tmp_dir.path().to_str().expect("path is utf8")
+    );
+
+    plan_and_collect(&ctx, &sql)
+        .await
+        .expect("Executing CREATE EXTERNAL TABLE");
+
+    let sql = "SELECT * from csv_with_timestamps where c_date='2018-11-13'";
+    let result = plan_and_collect(&ctx, sql).await.unwrap();
+    let expected = vec![
+        "+--------+-------------------------+------------+",
+        "| name   | ts                      | c_date     |",
+        "+--------+-------------------------+------------+",
+        "| Andrew | 2018-11-13T17:11:10.011 | 2018-11-13 |",
+        "+--------+-------------------------+------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    Ok(())
+}
+
 #[tokio::test]
 #[should_panic(expected = "already exists")]
 async fn sql_create_duplicate_table() {
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 499e92911..42d3401ef 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -461,7 +461,21 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                 let options = ListingOptions::new(file_format)
                     .with_file_extension(scan.file_extension.clone())
-                    .with_table_partition_cols(scan.table_partition_cols.clone())
+                    .with_table_partition_cols(
+                        scan.table_partition_cols
+                            .iter()
+                            .map(|col| {
+                                (
+                                    col.clone(),
+                                    schema
+                                        .field_with_name(col)
+                                        .unwrap()
+                                        .data_type()
+                                        .clone(),
+                                )
+                            })
+                            .collect(),
+                    )
                     .with_collect_stat(scan.collect_stat)
                     .with_target_partitions(scan.target_partitions as usize)
                     .with_file_sort_order(file_sort_order);
@@ -876,7 +890,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         FileFormatType::Avro(protobuf::AvroFormat {})
                     } else {
                         return Err(proto_error(format!(
-                            "Error converting file format, {:?} is invalid as a datafusion foramt.",
+                            "Error converting file format, {:?} is invalid as a datafusion format.",
                             listing_table.options().format
                         )));
                     };
@@ -901,7 +915,9 @@ impl AsLogicalPlan for LogicalPlanNode {
                                 file_extension: options.file_extension.clone(),
                                 table_partition_cols: options
                                     .table_partition_cols
-                                    .clone(),
+                                    .iter()
+                                    .map(|x| x.0.clone())
+                                    .collect::<Vec<_>>(),
                                 paths: listing_table
                                     .table_paths()
                                     .iter()