You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/03 10:39:58 UTC

[arrow-datafusion] branch master updated: feat: 2061 create external table ddl table partition cols (#2099)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d54ba4e  feat: 2061 create external table ddl table partition cols (#2099)
d54ba4e is described below

commit d54ba4e648bdbe6f7d19f6bd66e5048a796690fc
Author: Rich <jy...@users.noreply.github.com>
AuthorDate: Sun Apr 3 06:39:52 2022 -0400

    feat: 2061 create external table ddl table partition cols (#2099)
    
    * #2061 support "PARTITIONED BY" in CreateExternalTable DDL for datafusion
    
    * support table_partition_cols in ballista and add ParquetReadOptions
    
    * fix a few usage of read_parquet
    
    * fix CsvReadOption clone due to removing the copy trait
    
    * fix CsvReadOption clone due to removing the copy trait
    
    * fix "missing documentation for a struct field"
    
    * fix a few usage of register_parquet
    
    * Allow ParquetReadOption to receive parquet_pruning from execution::Context::SessionConfig
    
    https://github.com/apache/arrow-datafusion/blob/73ea6e16f5c8f34526c01490a5ec277a68f33791/datafusion/tests/parquet_pruning.rs#L143
    
    * fix benches import
    
    * Apply suggestions from code review (lint)
---
 ballista-examples/src/bin/ballista-dataframe.rs    |   4 +-
 ballista/rust/client/src/context.rs                |  46 ++++++++--
 ballista/rust/core/proto/ballista.proto            |   1 +
 ballista/rust/core/src/serde/logical_plan/mod.rs   |   6 ++
 benchmarks/src/bin/nyctaxi.rs                      |   7 +-
 benchmarks/src/bin/tpch.rs                         |   2 +-
 datafusion-examples/examples/dataframe.rs          |   2 +-
 datafusion-examples/examples/flight_server.rs      |   1 +
 datafusion-examples/examples/parquet_sql.rs        |   1 +
 datafusion/core/benches/parquet_query_sql.rs       |  10 +-
 datafusion/core/src/execution/context.rs           |  51 +++++++----
 datafusion/core/src/execution/options.rs           | 102 +++++++++++++++++++--
 datafusion/core/src/logical_plan/builder.rs        |  18 +---
 datafusion/core/src/logical_plan/plan.rs           |   2 +
 .../core/src/physical_plan/file_format/csv.rs      |   8 +-
 .../core/src/physical_plan/file_format/parquet.rs  |  35 +++++--
 datafusion/core/src/physical_plan/planner.rs       |   6 +-
 datafusion/core/src/prelude.rs                     |   5 +-
 datafusion/core/src/sql/parser.rs                  |  66 +++++++++++++
 datafusion/core/src/sql/planner.rs                 |   2 +
 datafusion/core/tests/parquet_pruning.rs           |   6 +-
 datafusion/core/tests/sql/mod.rs                   |   1 +
 datafusion/core/tests/sql/parquet.rs               |  16 +++-
 docs/source/user-guide/sql/ddl.md                  |  15 +++
 24 files changed, 330 insertions(+), 83 deletions(-)

diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs
index cab8f56..a819950 100644
--- a/ballista-examples/src/bin/ballista-dataframe.rs
+++ b/ballista-examples/src/bin/ballista-dataframe.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use ballista::prelude::*;
-use datafusion::prelude::{col, lit};
+use datafusion::prelude::{col, lit, ParquetReadOptions};
 
 /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
 /// fetching results, using the DataFrame trait
@@ -33,7 +33,7 @@ async fn main() -> Result<()> {
 
     // define the query using the DataFrame trait
     let df = ctx
-        .read_parquet(filename)
+        .read_parquet(filename, ParquetReadOptions::default())
         .await?
         .select_columns(&["id", "bool_col", "timestamp_col"])?
         .filter(col("id").gt(lit(1)))?;
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index e4233d4..0a002e8 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -36,7 +36,7 @@ use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
 use datafusion::prelude::{
-    AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
+    AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
 };
 use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};
 
@@ -221,13 +221,17 @@ impl BallistaContext {
 
     /// Create a DataFrame representing a Parquet table scan
     /// TODO fetch schema from scheduler instead of resolving locally
-    pub async fn read_parquet(&self, path: &str) -> Result<Arc<DataFrame>> {
+    pub async fn read_parquet(
+        &self,
+        path: &str,
+        options: ParquetReadOptions<'_>,
+    ) -> Result<Arc<DataFrame>> {
         // convert to absolute path because the executor likely has a different working directory
         let path = PathBuf::from(path);
         let path = fs::canonicalize(&path)?;
 
         let ctx = self.context.clone();
-        let df = ctx.read_parquet(path.to_str().unwrap()).await?;
+        let df = ctx.read_parquet(path.to_str().unwrap(), options).await?;
         Ok(df)
     }
 
@@ -272,8 +276,13 @@ impl BallistaContext {
         }
     }
 
-    pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
-        match self.read_parquet(path).await?.to_logical_plan() {
+    pub async fn register_parquet(
+        &self,
+        name: &str,
+        path: &str,
+        options: ParquetReadOptions<'_>,
+    ) -> Result<()> {
+        match self.read_parquet(path, options).await?.to_logical_plan() {
             LogicalPlan::TableScan(TableScan { source, .. }) => {
                 self.register_table(name, source)
             }
@@ -366,6 +375,7 @@ impl BallistaContext {
                 ref location,
                 ref file_type,
                 ref has_header,
+                ref table_partition_cols,
             }) => match file_type {
                 FileType::CSV => {
                     self.register_csv(
@@ -373,18 +383,30 @@ impl BallistaContext {
                         location,
                         CsvReadOptions::new()
                             .schema(&schema.as_ref().to_owned().into())
-                            .has_header(*has_header),
+                            .has_header(*has_header)
+                            .table_partition_cols(table_partition_cols.to_vec()),
                     )
                     .await?;
                     Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
                 }
                 FileType::Parquet => {
-                    self.register_parquet(name, location).await?;
+                    self.register_parquet(
+                        name,
+                        location,
+                        ParquetReadOptions::default()
+                            .table_partition_cols(table_partition_cols.to_vec()),
+                    )
+                    .await?;
                     Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
                 }
                 FileType::Avro => {
-                    self.register_avro(name, location, AvroReadOptions::default())
-                        .await?;
+                    self.register_avro(
+                        name,
+                        location,
+                        AvroReadOptions::default()
+                            .table_partition_cols(table_partition_cols.to_vec()),
+                    )
+                    .await?;
                     Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
                 }
                 _ => Err(DataFusionError::NotImplemented(format!(
@@ -525,7 +547,11 @@ mod tests {
 
         let testdata = datafusion::test_util::parquet_test_data();
         context
-            .register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
+            .register_parquet(
+                "single_nan",
+                &format!("{}/single_nan.parquet", testdata),
+                ParquetReadOptions::default(),
+            )
             .await
             .unwrap();
 
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 015618f..7f96a00 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -146,6 +146,7 @@ message CreateExternalTableNode {
   FileType file_type = 3;
   bool has_header = 4;
   datafusion.DfSchema schema = 5;
+  repeated string table_partition_cols = 6;
 }
 
 message CreateCatalogSchemaNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 05f3966..3aed2db 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -325,6 +325,9 @@ impl AsLogicalPlan for LogicalPlanNode {
                     location: create_extern_table.location.clone(),
                     file_type: pb_file_type.into(),
                     has_header: create_extern_table.has_header,
+                    table_partition_cols: create_extern_table
+                        .table_partition_cols
+                        .clone(),
                 }))
             }
             LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
@@ -771,6 +774,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 file_type,
                 has_header,
                 schema: df_schema,
+                table_partition_cols,
             }) => {
                 use datafusion::sql::parser::FileType;
 
@@ -789,6 +793,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             file_type: pb_file_type as i32,
                             has_header: *has_header,
                             schema: Some(df_schema.into()),
+                            table_partition_cols: table_partition_cols.clone(),
                         },
                     )),
                 })
@@ -1123,6 +1128,7 @@ mod roundtrip_tests {
                     location: String::from("employee.csv"),
                     file_type: *file,
                     has_header: true,
+                    table_partition_cols: vec![],
                 });
 
             roundtrip_test!(create_table_node);
diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs
index a0cdb74..e22c71e 100644
--- a/benchmarks/src/bin/nyctaxi.rs
+++ b/benchmarks/src/bin/nyctaxi.rs
@@ -29,7 +29,7 @@ use datafusion::error::Result;
 use datafusion::execution::context::{SessionConfig, SessionContext};
 
 use datafusion::physical_plan::collect;
-use datafusion::prelude::CsvReadOptions;
+use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -82,7 +82,10 @@ async fn main() -> Result<()> {
             let options = CsvReadOptions::new().schema(&schema).has_header(true);
             ctx.register_csv("tripdata", path, options).await?
         }
-        "parquet" => ctx.register_parquet("tripdata", path).await?,
+        "parquet" => {
+            ctx.register_parquet("tripdata", path, ParquetReadOptions::default())
+                .await?
+        }
         other => {
             println!("Invalid file format '{}'", other);
             process::exit(-1);
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d337857..1060bd2 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -559,7 +559,7 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
             }
             "parquet" => {
                 let path = format!("{}/{}", path, table);
-                ctx.register_parquet(table, &path)
+                ctx.register_parquet(table, &path, ParquetReadOptions::default())
                     .await
                     .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
                     .unwrap();
diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs
index 982f1b4..5cdec9b 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -31,7 +31,7 @@ async fn main() -> Result<()> {
 
     // define the query using the DataFrame trait
     let df = ctx
-        .read_parquet(filename)
+        .read_parquet(filename, ParquetReadOptions::default())
         .await?
         .select_columns(&["id", "bool_col", "timestamp_col"])?
         .filter(col("id").gt(lit(1)))?;
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index fa27782..703cb70 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl {
                 ctx.register_parquet(
                     "alltypes_plain",
                     &format!("{}/alltypes_plain.parquet", testdata),
+                    ParquetReadOptions::default(),
                 )
                 .await
                 .map_err(to_tonic_err)?;
diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs
index 9d3dd49..bcaa05d 100644
--- a/datafusion-examples/examples/parquet_sql.rs
+++ b/datafusion-examples/examples/parquet_sql.rs
@@ -31,6 +31,7 @@ async fn main() -> Result<()> {
     ctx.register_parquet(
         "alltypes_plain",
         &format!("{}/alltypes_plain.parquet", testdata),
+        ParquetReadOptions::default(),
     )
     .await?;
 
diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs
index 5416e2c..08156fa 100644
--- a/datafusion/core/benches/parquet_query_sql.rs
+++ b/datafusion/core/benches/parquet_query_sql.rs
@@ -24,7 +24,7 @@ use arrow::datatypes::{
 };
 use arrow::record_batch::RecordBatch;
 use criterion::{criterion_group, criterion_main, Criterion};
-use datafusion::prelude::SessionContext;
+use datafusion::prelude::{ParquetReadOptions, SessionContext};
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::{WriterProperties, WriterVersion};
 use rand::distributions::uniform::SampleUniform;
@@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) {
     let context = SessionContext::new();
 
     let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
-    rt.block_on(context.register_parquet("t", file_path.as_str()))
-        .unwrap();
+    rt.block_on(context.register_parquet(
+        "t",
+        file_path.as_str(),
+        ParquetReadOptions::default(),
+    ))
+    .unwrap();
 
     // We read the queries from a file so they can be changed without recompiling the benchmark
     let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index aa17d57..ab65d28 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -92,7 +92,9 @@ use chrono::{DateTime, Utc};
 use parquet::file::properties::WriterProperties;
 use uuid::Uuid;
 
-use super::options::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions};
+use super::options::{
+    AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
+};
 
 /// The default catalog name - this impacts what SQL queries use if not specified
 const DEFAULT_CATALOG: &str = "datafusion";
@@ -215,6 +217,7 @@ impl SessionContext {
                 ref location,
                 ref file_type,
                 ref has_header,
+                ref table_partition_cols,
             }) => {
                 let (file_format, file_extension) = match file_type {
                     FileType::CSV => (
@@ -241,7 +244,7 @@ impl SessionContext {
                     collect_stat: false,
                     file_extension: file_extension.to_owned(),
                     target_partitions: self.copied_config().target_partitions,
-                    table_partition_cols: vec![],
+                    table_partition_cols: table_partition_cols.clone(),
                 };
 
                 // TODO make schema in CreateExternalTable optional instead of empty
@@ -462,14 +465,23 @@ impl SessionContext {
     }
 
     /// Creates a DataFrame for reading a Parquet data source.
-    pub async fn read_parquet(&self, uri: impl Into<String>) -> Result<Arc<DataFrame>> {
+    pub async fn read_parquet(
+        &self,
+        uri: impl Into<String>,
+        options: ParquetReadOptions<'_>,
+    ) -> Result<Arc<DataFrame>> {
         let uri: String = uri.into();
         let (object_store, path) = self.runtime_env().object_store(&uri)?;
         let target_partitions = self.copied_config().target_partitions;
-        let logical_plan =
-            LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions)
-                .await?
-                .build()?;
+        let logical_plan = LogicalPlanBuilder::scan_parquet(
+            object_store,
+            path,
+            options,
+            None,
+            target_partitions,
+        )
+        .await?
+        .build()?;
         Ok(Arc::new(DataFrame::new(self.state.clone(), &logical_plan)))
     }
 
@@ -548,20 +560,19 @@ impl SessionContext {
 
     /// Registers a Parquet data source so that it can be referenced from SQL statements
     /// executed against this context.
-    pub async fn register_parquet(&self, name: &str, uri: &str) -> Result<()> {
-        let (target_partitions, enable_pruning) = {
+    pub async fn register_parquet(
+        &self,
+        name: &str,
+        uri: &str,
+        options: ParquetReadOptions<'_>,
+    ) -> Result<()> {
+        let (target_partitions, parquet_pruning) = {
             let conf = self.copied_config();
             (conf.target_partitions, conf.parquet_pruning)
         };
-        let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning);
-
-        let listing_options = ListingOptions {
-            format: Arc::new(file_format),
-            collect_stat: true,
-            file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
-            target_partitions,
-            table_partition_cols: vec![],
-        };
+        let listing_options = options
+            .parquet_pruning(parquet_pruning)
+            .to_listing_options(target_partitions);
 
         self.register_listing_table(name, uri, listing_options, None)
             .await?;
@@ -3510,7 +3521,9 @@ mod tests {
 
         async fn call_read_parquet(&self) -> Arc<DataFrame> {
             let ctx = SessionContext::new();
-            ctx.read_parquet("dummy").await.unwrap()
+            ctx.read_parquet("dummy", ParquetReadOptions::default())
+                .await
+                .unwrap()
         }
     }
 }
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index d9a0304..b790ca3 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -21,14 +21,18 @@ use std::sync::Arc;
 
 use arrow::datatypes::{Schema, SchemaRef};
 
-use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
 use crate::datasource::{
-    file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
+    file_format::{
+        avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
+        csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
+        json::{JsonFormat, DEFAULT_JSON_EXTENSION},
+        parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+    },
     listing::ListingOptions,
 };
 
 /// CSV file read option
-#[derive(Copy, Clone)]
+#[derive(Clone)]
 pub struct CsvReadOptions<'a> {
     /// Does the CSV file have a header?
     ///
@@ -43,8 +47,10 @@ pub struct CsvReadOptions<'a> {
     /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
     pub schema_infer_max_records: usize,
     /// File extension; only files with this extension are selected for data input.
-    /// Defaults to ".csv".
+    /// Defaults to DEFAULT_CSV_EXTENSION.
     pub file_extension: &'a str,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
 }
 
 impl<'a> Default for CsvReadOptions<'a> {
@@ -61,7 +67,8 @@ impl<'a> CsvReadOptions<'a> {
             schema: None,
             schema_infer_max_records: 1000,
             delimiter: b',',
-            file_extension: ".csv",
+            file_extension: DEFAULT_CSV_EXTENSION,
+            table_partition_cols: vec![],
         }
     }
 
@@ -97,6 +104,12 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
+    /// Specify table_partition_cols for partition pruning
+    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
     /// Configure number of max records to read for schema inference
     pub fn schema_infer_max_records(mut self, max_records: usize) -> Self {
         self.schema_infer_max_records = max_records;
@@ -115,7 +128,58 @@ impl<'a> CsvReadOptions<'a> {
             collect_stat: false,
             file_extension: self.file_extension.to_owned(),
             target_partitions,
+            table_partition_cols: self.table_partition_cols.clone(),
+        }
+    }
+}
+
+/// Parquet read options
+#[derive(Clone)]
+pub struct ParquetReadOptions<'a> {
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".parquet".
+    pub file_extension: &'a str,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
+    /// Should DataFusion parquet reader using the predicate to prune data,
+    /// overridden by value on execution::context::SessionConfig
+    pub parquet_pruning: bool,
+}
+
+impl<'a> Default for ParquetReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            file_extension: DEFAULT_PARQUET_EXTENSION,
             table_partition_cols: vec![],
+            parquet_pruning: ParquetFormat::default().enable_pruning(),
+        }
+    }
+}
+
+impl<'a> ParquetReadOptions<'a> {
+    /// Specify parquet_pruning
+    pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
+        self.parquet_pruning = parquet_pruning;
+        self
+    }
+
+    /// Specify table_partition_cols for partition pruning
+    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
+    /// Helper to convert these user facing options to `ListingTable` options
+    pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
+        let file_format =
+            ParquetFormat::default().with_enable_pruning(self.parquet_pruning);
+
+        ListingOptions {
+            format: Arc::new(file_format),
+            collect_stat: true,
+            file_extension: self.file_extension.to_owned(),
+            target_partitions,
+            table_partition_cols: self.table_partition_cols.clone(),
         }
     }
 }
@@ -127,20 +191,29 @@ pub struct AvroReadOptions<'a> {
     pub schema: Option<SchemaRef>,
 
     /// File extension; only files with this extension are selected for data input.
-    /// Defaults to ".avro".
+    /// Defaults to DEFAULT_AVRO_EXTENSION.
     pub file_extension: &'a str,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
 }
 
 impl<'a> Default for AvroReadOptions<'a> {
     fn default() -> Self {
         Self {
             schema: None,
-            file_extension: ".avro",
+            file_extension: DEFAULT_AVRO_EXTENSION,
+            table_partition_cols: vec![],
         }
     }
 }
 
 impl<'a> AvroReadOptions<'a> {
+    /// Specify table_partition_cols for partition pruning
+    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
     /// Helper to convert these user facing options to `ListingTable` options
     pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
         let file_format = AvroFormat::default();
@@ -150,7 +223,7 @@ impl<'a> AvroReadOptions<'a> {
             collect_stat: false,
             file_extension: self.file_extension.to_owned(),
             target_partitions,
-            table_partition_cols: vec![],
+            table_partition_cols: self.table_partition_cols.clone(),
         }
     }
 }
@@ -165,8 +238,10 @@ pub struct NdJsonReadOptions<'a> {
     pub schema_infer_max_records: usize,
 
     /// File extension; only files with this extension are selected for data input.
-    /// Defaults to ".json".
+    /// Defaults to DEFAULT_JSON_EXTENSION.
     pub file_extension: &'a str,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
 }
 
 impl<'a> Default for NdJsonReadOptions<'a> {
@@ -175,11 +250,18 @@ impl<'a> Default for NdJsonReadOptions<'a> {
             schema: None,
             schema_infer_max_records: 1000,
             file_extension: DEFAULT_JSON_EXTENSION,
+            table_partition_cols: vec![],
         }
     }
 }
 
 impl<'a> NdJsonReadOptions<'a> {
+    /// Specify table_partition_cols for partition pruning
+    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
     /// Helper to convert these user facing options to `ListingTable` options
     pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
         let file_format = JsonFormat::default();
@@ -188,7 +270,7 @@ impl<'a> NdJsonReadOptions<'a> {
             collect_stat: false,
             file_extension: self.file_extension.to_owned(),
             target_partitions,
-            table_partition_cols: vec![],
+            table_partition_cols: self.table_partition_cols.clone(),
         }
     }
 }
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 5a1659d..d2f5c04 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -19,8 +19,7 @@
 
 use crate::datasource::{
     empty::EmptyTable,
-    file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
-    listing::{ListingOptions, ListingTable, ListingTableConfig},
+    listing::{ListingTable, ListingTableConfig},
     MemTable, TableProvider,
 };
 use crate::error::{DataFusionError, Result};
@@ -256,6 +255,7 @@ impl LogicalPlanBuilder {
     pub async fn scan_parquet(
         object_store: Arc<dyn ObjectStore>,
         path: impl Into<String>,
+        options: ParquetReadOptions<'_>,
         projection: Option<Vec<usize>>,
         target_partitions: usize,
     ) -> Result<Self> {
@@ -263,6 +263,7 @@ impl LogicalPlanBuilder {
         Self::scan_parquet_with_name(
             object_store,
             path.clone(),
+            options,
             projection,
             target_partitions,
             path,
@@ -274,21 +275,12 @@ impl LogicalPlanBuilder {
     pub async fn scan_parquet_with_name(
         object_store: Arc<dyn ObjectStore>,
         path: impl Into<String>,
+        options: ParquetReadOptions<'_>,
         projection: Option<Vec<usize>>,
         target_partitions: usize,
         table_name: impl Into<String>,
     ) -> Result<Self> {
-        // TODO remove hard coded enable_pruning
-        let file_format = ParquetFormat::default().with_enable_pruning(true);
-
-        let listing_options = ListingOptions {
-            format: Arc::new(file_format),
-            collect_stat: true,
-            file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
-            target_partitions,
-            table_partition_cols: vec![],
-        };
-
+        let listing_options = options.to_listing_options(target_partitions);
         let path: String = path.into();
 
         // with parquet we resolve the schema in all cases
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 3acb69e..eeb06d5 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -184,6 +184,8 @@ pub struct CreateExternalTable {
     pub file_type: FileType,
     /// Whether the CSV file contains a header
     pub has_header: bool,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
 }
 
 /// Creates a schema.
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index c3bf7f9..9791b2e 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -487,8 +487,12 @@ mod tests {
 
         // register each partition as well as the top level dir
         let csv_read_option = CsvReadOptions::new().schema(&schema);
-        ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
-            .await?;
+        ctx.register_csv(
+            "part0",
+            &format!("{}/part-0.csv", out_dir),
+            csv_read_option.clone(),
+        )
+        .await?;
         ctx.register_csv("allparts", &out_dir, csv_read_option)
             .await?;
 
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 6a0a619..310c5e7 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -595,7 +595,7 @@ mod tests {
 
     use super::*;
     use crate::execution::options::CsvReadOptions;
-    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use arrow::array::Float32Array;
     use arrow::{
         array::{Int64Array, Int8Array, StringArray},
@@ -1331,15 +1331,32 @@ mod tests {
         let ctx = SessionContext::new();
 
         // register each partition as well as the top level dir
-        ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
+        ctx.register_parquet(
+            "part0",
+            &format!("{}/part-0.parquet", out_dir),
+            ParquetReadOptions::default(),
+        )
+        .await?;
+        ctx.register_parquet(
+            "part1",
+            &format!("{}/part-1.parquet", out_dir),
+            ParquetReadOptions::default(),
+        )
+        .await?;
+        ctx.register_parquet(
+            "part2",
+            &format!("{}/part-2.parquet", out_dir),
+            ParquetReadOptions::default(),
+        )
+        .await?;
+        ctx.register_parquet(
+            "part3",
+            &format!("{}/part-3.parquet", out_dir),
+            ParquetReadOptions::default(),
+        )
+        .await?;
+        ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
             .await?;
-        ctx.register_parquet("allparts", &out_dir).await?;
 
         let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
         let allparts = ctx
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 7504dd4..4dbaca2 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1586,7 +1586,7 @@ mod tests {
             let logical_plan = LogicalPlanBuilder::scan_csv(
                 Arc::new(LocalFileSystem {}),
                 &path,
-                options,
+                options.clone(),
                 None,
                 1,
             )
@@ -1686,7 +1686,7 @@ mod tests {
         let logical_plan = LogicalPlanBuilder::scan_csv(
             Arc::new(LocalFileSystem {}),
             &path,
-            options,
+            options.clone(),
             None,
             1,
         )
@@ -1708,7 +1708,7 @@ mod tests {
         let logical_plan = LogicalPlanBuilder::scan_csv(
             Arc::new(LocalFileSystem {}),
             &path,
-            options,
+            options.clone(),
             None,
             1,
         )
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index e40693e..cd78209 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -27,8 +27,9 @@
 
 pub use crate::dataframe::DataFrame;
 pub use crate::execution::context::{SessionConfig, SessionContext};
-pub use crate::execution::options::AvroReadOptions;
-pub use crate::execution::options::{CsvReadOptions, NdJsonReadOptions};
+pub use crate::execution::options::{
+    AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
+};
 pub use crate::logical_plan::{
     approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
     col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest, in_list,
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 6ad105f..566e91c 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -78,6 +78,8 @@ pub struct CreateExternalTable {
     pub has_header: bool,
     /// Path to file
     pub location: String,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
 }
 
 /// DataFusion Statement representations.
@@ -192,6 +194,35 @@ impl<'a> DFParser<'a> {
         }
     }
 
+    fn parse_partitions(&mut self) -> Result<Vec<String>, ParserError> {
+        let mut partitions: Vec<String> = vec![];
+        if !self.parser.consume_token(&Token::LParen)
+            || self.parser.consume_token(&Token::RParen)
+        {
+            return Ok(partitions);
+        }
+
+        loop {
+            if let Token::Word(_) = self.parser.peek_token() {
+                let identifier = self.parser.parse_identifier()?;
+                partitions.push(identifier.to_string());
+            } else {
+                return self.expected("partition name", self.parser.peek_token());
+            }
+            let comma = self.parser.consume_token(&Token::Comma);
+            if self.parser.consume_token(&Token::RParen) {
+                // allow a trailing comma, even though it's not in standard
+                break;
+            } else if !comma {
+                return self.expected(
+                    "',' or ')' after partition definition",
+                    self.parser.peek_token(),
+                );
+            }
+        }
+        Ok(partitions)
+    }
+
     // This is a copy of the equivalent implementation in sqlparser.
     fn parse_columns(
         &mut self,
@@ -277,6 +308,12 @@ impl<'a> DFParser<'a> {
 
         let has_header = self.parse_csv_has_header();
 
+        let table_partition_cols = if self.parse_has_partition() {
+            self.parse_partitions()?
+        } else {
+            vec![]
+        };
+
         self.parser.expect_keyword(Keyword::LOCATION)?;
         let location = self.parser.parse_literal_string()?;
 
@@ -286,6 +323,7 @@ impl<'a> DFParser<'a> {
             file_type,
             has_header,
             location,
+            table_partition_cols,
         };
         Ok(Statement::CreateExternalTable(create))
     }
@@ -314,6 +352,11 @@ impl<'a> DFParser<'a> {
             & self.consume_token(&Token::make_keyword("HEADER"))
             & self.consume_token(&Token::make_keyword("ROW"))
     }
+
+    fn parse_has_partition(&mut self) -> bool {
+        self.consume_token(&Token::make_keyword("PARTITIONED"))
+            & self.consume_token(&Token::make_keyword("BY"))
+    }
 }
 
 #[cfg(test)]
@@ -376,6 +419,20 @@ mod tests {
             file_type: FileType::CSV,
             has_header: false,
             location: "foo.csv".into(),
+            table_partition_cols: vec![],
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // positive case: partitioned by
+        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'";
+        let display = None;
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![make_column_def("c1", DataType::Int(display))],
+            file_type: FileType::CSV,
+            has_header: false,
+            location: "foo.csv".into(),
+            table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
         });
         expect_parse_ok(sql, expected)?;
 
@@ -391,6 +448,7 @@ mod tests {
                 file_type: FileType::CSV,
                 has_header: true,
                 location: "foo.csv".into(),
+                table_partition_cols: vec![],
             });
             expect_parse_ok(sql, expected)?;
         }
@@ -403,6 +461,7 @@ mod tests {
             file_type: FileType::Parquet,
             has_header: false,
             location: "foo.parquet".into(),
+            table_partition_cols: vec![],
         });
         expect_parse_ok(sql, expected)?;
 
@@ -414,6 +473,7 @@ mod tests {
             file_type: FileType::Parquet,
             has_header: false,
             location: "foo.parquet".into(),
+            table_partition_cols: vec![],
         });
         expect_parse_ok(sql, expected)?;
 
@@ -425,6 +485,7 @@ mod tests {
             file_type: FileType::Avro,
             has_header: false,
             location: "foo.avro".into(),
+            table_partition_cols: vec![],
         });
         expect_parse_ok(sql, expected)?;
 
@@ -433,6 +494,11 @@ mod tests {
             "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'";
         expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV");
 
+        // Error cases: partition column does not support type
+        let sql =
+            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
+        expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");
+
         Ok(())
     }
 }
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 4409110..b59444e 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -308,6 +308,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             file_type,
             has_header,
             location,
+            table_partition_cols,
         } = statement;
 
         // semantic checks
@@ -333,6 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             location,
             file_type,
             has_header,
+            table_partition_cols,
         }))
     }
 
diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs
index 4301e65..d5392e9 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -37,7 +37,7 @@ use datafusion::{
         accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
         ExecutionPlanVisitor,
     },
-    prelude::{SessionConfig, SessionContext},
+    prelude::{ParquetReadOptions, SessionConfig, SessionContext},
     scalar::ScalarValue,
 };
 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
@@ -482,7 +482,9 @@ impl ContextWithParquet {
         // now, setup a the file as a data source and run a query against it
         let ctx = SessionContext::with_config(config);
 
-        ctx.register_parquet("t", &parquet_path).await.unwrap();
+        ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
+            .await
+            .unwrap();
         let provider = ctx.deregister_table("t").unwrap().unwrap();
         ctx.register_table("t", provider.clone()).unwrap();
 
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 724fffc..a788316 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -624,6 +624,7 @@ async fn register_alltypes_parquet(ctx: &SessionContext) {
     ctx.register_parquet(
         "alltypes_plain",
         &format!("{}/alltypes_plain.parquet", testdata),
+        ParquetReadOptions::default(),
     )
     .await
     .unwrap();
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 6dfb37e..7c1bb7c 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -52,9 +52,13 @@ async fn parquet_query() {
 async fn parquet_single_nan_schema() {
     let ctx = SessionContext::new();
     let testdata = datafusion::test_util::parquet_test_data();
-    ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
-        .await
-        .unwrap();
+    ctx.register_parquet(
+        "single_nan",
+        &format!("{}/single_nan.parquet", testdata),
+        ParquetReadOptions::default(),
+    )
+    .await
+    .unwrap();
     let sql = "SELECT mycol FROM single_nan";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
@@ -75,6 +79,7 @@ async fn parquet_list_columns() {
     ctx.register_parquet(
         "list_columns",
         &format!("{}/list_columns.parquet", testdata),
+        ParquetReadOptions::default(),
     )
     .await
     .unwrap();
@@ -214,7 +219,10 @@ async fn schema_merge_ignores_metadata() {
     // (no errors)
     let ctx = SessionContext::new();
     let df = ctx
-        .read_parquet(table_dir.to_str().unwrap().to_string())
+        .read_parquet(
+            table_dir.to_str().unwrap().to_string(),
+            ParquetReadOptions::default(),
+        )
         .await
         .unwrap();
     let result = df.collect().await.unwrap();
diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md
index eb10f40..75ec0f6 100644
--- a/docs/source/user-guide/sql/ddl.md
+++ b/docs/source/user-guide/sql/ddl.md
@@ -55,6 +55,21 @@ WITH HEADER ROW
 LOCATION '/path/to/aggregate_test_100.csv';
 ```
 
+If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning.
+
+```
+/mnt/nyctaxi/year=2022/month=01/tripdata.parquet
+/mnt/nyctaxi/year=2021/month=12/tripdata.parquet
+/mnt/nyctaxi/year=2021/month=11/tripdata.parquet
+```
+
+```sql
+CREATE EXTERNAL TABLE taxi
+STORED AS PARQUET
+PARTITIONED BY (year, month)
+LOCATION '/mnt/nyctaxi';
+```
+
 ## CREATE MEMORY TABLE
 
 Memory table can be created with query.