You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/03 10:39:58 UTC
[arrow-datafusion] branch master updated: feat: 2061 create external table ddl table partition cols (#2099)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d54ba4e feat: 2061 create external table ddl table partition cols (#2099)
d54ba4e is described below
commit d54ba4e648bdbe6f7d19f6bd66e5048a796690fc
Author: Rich <jy...@users.noreply.github.com>
AuthorDate: Sun Apr 3 06:39:52 2022 -0400
feat: 2061 create external table ddl table partition cols (#2099)
* #2061 support "PARTITIONED BY" in CreateExternalTable DDL for datafusion
* support table_partition_cols in ballista and add ParquetReadOptions
* fix a few usage of read_parquet
* fix CsvReadOption clone due to removing the copy trait
* fix CsvReadOption clone due to removing the copy trait
* fix "missing documentation for a struct field"
* fix a few usage of register_parquet
* Allow ParquetReadOption to receive parquet_pruning from execution::Context::SessionConfig
https://github.com/apache/arrow-datafusion/blob/73ea6e16f5c8f34526c01490a5ec277a68f33791/datafusion/tests/parquet_pruning.rs#L143
* fix benches import
* Apply suggestions from code review (lint)
---
ballista-examples/src/bin/ballista-dataframe.rs | 4 +-
ballista/rust/client/src/context.rs | 46 ++++++++--
ballista/rust/core/proto/ballista.proto | 1 +
ballista/rust/core/src/serde/logical_plan/mod.rs | 6 ++
benchmarks/src/bin/nyctaxi.rs | 7 +-
benchmarks/src/bin/tpch.rs | 2 +-
datafusion-examples/examples/dataframe.rs | 2 +-
datafusion-examples/examples/flight_server.rs | 1 +
datafusion-examples/examples/parquet_sql.rs | 1 +
datafusion/core/benches/parquet_query_sql.rs | 10 +-
datafusion/core/src/execution/context.rs | 51 +++++++----
datafusion/core/src/execution/options.rs | 102 +++++++++++++++++++--
datafusion/core/src/logical_plan/builder.rs | 18 +---
datafusion/core/src/logical_plan/plan.rs | 2 +
.../core/src/physical_plan/file_format/csv.rs | 8 +-
.../core/src/physical_plan/file_format/parquet.rs | 35 +++++--
datafusion/core/src/physical_plan/planner.rs | 6 +-
datafusion/core/src/prelude.rs | 5 +-
datafusion/core/src/sql/parser.rs | 66 +++++++++++++
datafusion/core/src/sql/planner.rs | 2 +
datafusion/core/tests/parquet_pruning.rs | 6 +-
datafusion/core/tests/sql/mod.rs | 1 +
datafusion/core/tests/sql/parquet.rs | 16 +++-
docs/source/user-guide/sql/ddl.md | 15 +++
24 files changed, 330 insertions(+), 83 deletions(-)
diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs
index cab8f56..a819950 100644
--- a/ballista-examples/src/bin/ballista-dataframe.rs
+++ b/ballista-examples/src/bin/ballista-dataframe.rs
@@ -16,7 +16,7 @@
// under the License.
use ballista::prelude::*;
-use datafusion::prelude::{col, lit};
+use datafusion::prelude::{col, lit, ParquetReadOptions};
/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
@@ -33,7 +33,7 @@ async fn main() -> Result<()> {
// define the query using the DataFrame trait
let df = ctx
- .read_parquet(filename)
+ .read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index e4233d4..0a002e8 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -36,7 +36,7 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{
- AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
+ AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
};
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};
@@ -221,13 +221,17 @@ impl BallistaContext {
/// Create a DataFrame representing a Parquet table scan
/// TODO fetch schema from scheduler instead of resolving locally
- pub async fn read_parquet(&self, path: &str) -> Result<Arc<DataFrame>> {
+ pub async fn read_parquet(
+ &self,
+ path: &str,
+ options: ParquetReadOptions<'_>,
+ ) -> Result<Arc<DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;
let ctx = self.context.clone();
- let df = ctx.read_parquet(path.to_str().unwrap()).await?;
+ let df = ctx.read_parquet(path.to_str().unwrap(), options).await?;
Ok(df)
}
@@ -272,8 +276,13 @@ impl BallistaContext {
}
}
- pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
- match self.read_parquet(path).await?.to_logical_plan() {
+ pub async fn register_parquet(
+ &self,
+ name: &str,
+ path: &str,
+ options: ParquetReadOptions<'_>,
+ ) -> Result<()> {
+ match self.read_parquet(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
@@ -366,6 +375,7 @@ impl BallistaContext {
ref location,
ref file_type,
ref has_header,
+ ref table_partition_cols,
}) => match file_type {
FileType::CSV => {
self.register_csv(
@@ -373,18 +383,30 @@ impl BallistaContext {
location,
CsvReadOptions::new()
.schema(&schema.as_ref().to_owned().into())
- .has_header(*has_header),
+ .has_header(*has_header)
+ .table_partition_cols(table_partition_cols.to_vec()),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
FileType::Parquet => {
- self.register_parquet(name, location).await?;
+ self.register_parquet(
+ name,
+ location,
+ ParquetReadOptions::default()
+ .table_partition_cols(table_partition_cols.to_vec()),
+ )
+ .await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
FileType::Avro => {
- self.register_avro(name, location, AvroReadOptions::default())
- .await?;
+ self.register_avro(
+ name,
+ location,
+ AvroReadOptions::default()
+ .table_partition_cols(table_partition_cols.to_vec()),
+ )
+ .await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
@@ -525,7 +547,11 @@ mod tests {
let testdata = datafusion::test_util::parquet_test_data();
context
- .register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
+ .register_parquet(
+ "single_nan",
+ &format!("{}/single_nan.parquet", testdata),
+ ParquetReadOptions::default(),
+ )
.await
.unwrap();
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 015618f..7f96a00 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -146,6 +146,7 @@ message CreateExternalTableNode {
FileType file_type = 3;
bool has_header = 4;
datafusion.DfSchema schema = 5;
+ repeated string table_partition_cols = 6;
}
message CreateCatalogSchemaNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 05f3966..3aed2db 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -325,6 +325,9 @@ impl AsLogicalPlan for LogicalPlanNode {
location: create_extern_table.location.clone(),
file_type: pb_file_type.into(),
has_header: create_extern_table.has_header,
+ table_partition_cols: create_extern_table
+ .table_partition_cols
+ .clone(),
}))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
@@ -771,6 +774,7 @@ impl AsLogicalPlan for LogicalPlanNode {
file_type,
has_header,
schema: df_schema,
+ table_partition_cols,
}) => {
use datafusion::sql::parser::FileType;
@@ -789,6 +793,7 @@ impl AsLogicalPlan for LogicalPlanNode {
file_type: pb_file_type as i32,
has_header: *has_header,
schema: Some(df_schema.into()),
+ table_partition_cols: table_partition_cols.clone(),
},
)),
})
@@ -1123,6 +1128,7 @@ mod roundtrip_tests {
location: String::from("employee.csv"),
file_type: *file,
has_header: true,
+ table_partition_cols: vec![],
});
roundtrip_test!(create_table_node);
diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs
index a0cdb74..e22c71e 100644
--- a/benchmarks/src/bin/nyctaxi.rs
+++ b/benchmarks/src/bin/nyctaxi.rs
@@ -29,7 +29,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::physical_plan::collect;
-use datafusion::prelude::CsvReadOptions;
+use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use structopt::StructOpt;
#[cfg(feature = "snmalloc")]
@@ -82,7 +82,10 @@ async fn main() -> Result<()> {
let options = CsvReadOptions::new().schema(&schema).has_header(true);
ctx.register_csv("tripdata", path, options).await?
}
- "parquet" => ctx.register_parquet("tripdata", path).await?,
+ "parquet" => {
+ ctx.register_parquet("tripdata", path, ParquetReadOptions::default())
+ .await?
+ }
other => {
println!("Invalid file format '{}'", other);
process::exit(-1);
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d337857..1060bd2 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -559,7 +559,7 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
}
"parquet" => {
let path = format!("{}/{}", path, table);
- ctx.register_parquet(table, &path)
+ ctx.register_parquet(table, &path, ParquetReadOptions::default())
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs
index 982f1b4..5cdec9b 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -31,7 +31,7 @@ async fn main() -> Result<()> {
// define the query using the DataFrame trait
let df = ctx
- .read_parquet(filename)
+ .read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index fa27782..703cb70 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
+ ParquetReadOptions::default(),
)
.await
.map_err(to_tonic_err)?;
diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs
index 9d3dd49..bcaa05d 100644
--- a/datafusion-examples/examples/parquet_sql.rs
+++ b/datafusion-examples/examples/parquet_sql.rs
@@ -31,6 +31,7 @@ async fn main() -> Result<()> {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
+ ParquetReadOptions::default(),
)
.await?;
diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs
index 5416e2c..08156fa 100644
--- a/datafusion/core/benches/parquet_query_sql.rs
+++ b/datafusion/core/benches/parquet_query_sql.rs
@@ -24,7 +24,7 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
-use datafusion::prelude::SessionContext;
+use datafusion::prelude::{ParquetReadOptions, SessionContext};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
@@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) {
let context = SessionContext::new();
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
- rt.block_on(context.register_parquet("t", file_path.as_str()))
- .unwrap();
+ rt.block_on(context.register_parquet(
+ "t",
+ file_path.as_str(),
+ ParquetReadOptions::default(),
+ ))
+ .unwrap();
// We read the queries from a file so they can be changed without recompiling the benchmark
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index aa17d57..ab65d28 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -92,7 +92,9 @@ use chrono::{DateTime, Utc};
use parquet::file::properties::WriterProperties;
use uuid::Uuid;
-use super::options::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions};
+use super::options::{
+ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
+};
/// The default catalog name - this impacts what SQL queries use if not specified
const DEFAULT_CATALOG: &str = "datafusion";
@@ -215,6 +217,7 @@ impl SessionContext {
ref location,
ref file_type,
ref has_header,
+ ref table_partition_cols,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
@@ -241,7 +244,7 @@ impl SessionContext {
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
- table_partition_cols: vec![],
+ table_partition_cols: table_partition_cols.clone(),
};
// TODO make schema in CreateExternalTable optional instead of empty
@@ -462,14 +465,23 @@ impl SessionContext {
}
/// Creates a DataFrame for reading a Parquet data source.
- pub async fn read_parquet(&self, uri: impl Into<String>) -> Result<Arc<DataFrame>> {
+ pub async fn read_parquet(
+ &self,
+ uri: impl Into<String>,
+ options: ParquetReadOptions<'_>,
+ ) -> Result<Arc<DataFrame>> {
let uri: String = uri.into();
let (object_store, path) = self.runtime_env().object_store(&uri)?;
let target_partitions = self.copied_config().target_partitions;
- let logical_plan =
- LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions)
- .await?
- .build()?;
+ let logical_plan = LogicalPlanBuilder::scan_parquet(
+ object_store,
+ path,
+ options,
+ None,
+ target_partitions,
+ )
+ .await?
+ .build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &logical_plan)))
}
@@ -548,20 +560,19 @@ impl SessionContext {
/// Registers a Parquet data source so that it can be referenced from SQL statements
/// executed against this context.
- pub async fn register_parquet(&self, name: &str, uri: &str) -> Result<()> {
- let (target_partitions, enable_pruning) = {
+ pub async fn register_parquet(
+ &self,
+ name: &str,
+ uri: &str,
+ options: ParquetReadOptions<'_>,
+ ) -> Result<()> {
+ let (target_partitions, parquet_pruning) = {
let conf = self.copied_config();
(conf.target_partitions, conf.parquet_pruning)
};
- let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning);
-
- let listing_options = ListingOptions {
- format: Arc::new(file_format),
- collect_stat: true,
- file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
- target_partitions,
- table_partition_cols: vec![],
- };
+ let listing_options = options
+ .parquet_pruning(parquet_pruning)
+ .to_listing_options(target_partitions);
self.register_listing_table(name, uri, listing_options, None)
.await?;
@@ -3510,7 +3521,9 @@ mod tests {
async fn call_read_parquet(&self) -> Arc<DataFrame> {
let ctx = SessionContext::new();
- ctx.read_parquet("dummy").await.unwrap()
+ ctx.read_parquet("dummy", ParquetReadOptions::default())
+ .await
+ .unwrap()
}
}
}
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index d9a0304..b790ca3 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -21,14 +21,18 @@ use std::sync::Arc;
use arrow::datatypes::{Schema, SchemaRef};
-use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::{
- file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
+ file_format::{
+ avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
+ csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
+ json::{JsonFormat, DEFAULT_JSON_EXTENSION},
+ parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+ },
listing::ListingOptions,
};
/// CSV file read option
-#[derive(Copy, Clone)]
+#[derive(Clone)]
pub struct CsvReadOptions<'a> {
/// Does the CSV file have a header?
///
@@ -43,8 +47,10 @@ pub struct CsvReadOptions<'a> {
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
- /// Defaults to ".csv".
+ /// Defaults to DEFAULT_CSV_EXTENSION.
pub file_extension: &'a str,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
}
impl<'a> Default for CsvReadOptions<'a> {
@@ -61,7 +67,8 @@ impl<'a> CsvReadOptions<'a> {
schema: None,
schema_infer_max_records: 1000,
delimiter: b',',
- file_extension: ".csv",
+ file_extension: DEFAULT_CSV_EXTENSION,
+ table_partition_cols: vec![],
}
}
@@ -97,6 +104,12 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Specify table_partition_cols for partition pruning
+ pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
/// Configure number of max records to read for schema inference
pub fn schema_infer_max_records(mut self, max_records: usize) -> Self {
self.schema_infer_max_records = max_records;
@@ -115,7 +128,58 @@ impl<'a> CsvReadOptions<'a> {
collect_stat: false,
file_extension: self.file_extension.to_owned(),
target_partitions,
+ table_partition_cols: self.table_partition_cols.clone(),
+ }
+ }
+}
+
+/// Parquet read options
+#[derive(Clone)]
+pub struct ParquetReadOptions<'a> {
+ /// File extension; only files with this extension are selected for data input.
+ /// Defaults to ".parquet".
+ pub file_extension: &'a str,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
+ /// Should DataFusion parquet reader using the predicate to prune data,
+ /// overridden by value on execution::context::SessionConfig
+ pub parquet_pruning: bool,
+}
+
+impl<'a> Default for ParquetReadOptions<'a> {
+ fn default() -> Self {
+ Self {
+ file_extension: DEFAULT_PARQUET_EXTENSION,
table_partition_cols: vec![],
+ parquet_pruning: ParquetFormat::default().enable_pruning(),
+ }
+ }
+}
+
+impl<'a> ParquetReadOptions<'a> {
+ /// Specify parquet_pruning
+ pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
+ self.parquet_pruning = parquet_pruning;
+ self
+ }
+
+ /// Specify table_partition_cols for partition pruning
+ pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
+ /// Helper to convert these user facing options to `ListingTable` options
+ pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
+ let file_format =
+ ParquetFormat::default().with_enable_pruning(self.parquet_pruning);
+
+ ListingOptions {
+ format: Arc::new(file_format),
+ collect_stat: true,
+ file_extension: self.file_extension.to_owned(),
+ target_partitions,
+ table_partition_cols: self.table_partition_cols.clone(),
}
}
}
@@ -127,20 +191,29 @@ pub struct AvroReadOptions<'a> {
pub schema: Option<SchemaRef>,
/// File extension; only files with this extension are selected for data input.
- /// Defaults to ".avro".
+ /// Defaults to DEFAULT_AVRO_EXTENSION.
pub file_extension: &'a str,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
}
impl<'a> Default for AvroReadOptions<'a> {
fn default() -> Self {
Self {
schema: None,
- file_extension: ".avro",
+ file_extension: DEFAULT_AVRO_EXTENSION,
+ table_partition_cols: vec![],
}
}
}
impl<'a> AvroReadOptions<'a> {
+ /// Specify table_partition_cols for partition pruning
+ pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
/// Helper to convert these user facing options to `ListingTable` options
pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
let file_format = AvroFormat::default();
@@ -150,7 +223,7 @@ impl<'a> AvroReadOptions<'a> {
collect_stat: false,
file_extension: self.file_extension.to_owned(),
target_partitions,
- table_partition_cols: vec![],
+ table_partition_cols: self.table_partition_cols.clone(),
}
}
}
@@ -165,8 +238,10 @@ pub struct NdJsonReadOptions<'a> {
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
- /// Defaults to ".json".
+ /// Defaults to DEFAULT_JSON_EXTENSION.
pub file_extension: &'a str,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
}
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -175,11 +250,18 @@ impl<'a> Default for NdJsonReadOptions<'a> {
schema: None,
schema_infer_max_records: 1000,
file_extension: DEFAULT_JSON_EXTENSION,
+ table_partition_cols: vec![],
}
}
}
impl<'a> NdJsonReadOptions<'a> {
+ /// Specify table_partition_cols for partition pruning
+ pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
/// Helper to convert these user facing options to `ListingTable` options
pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
let file_format = JsonFormat::default();
@@ -188,7 +270,7 @@ impl<'a> NdJsonReadOptions<'a> {
collect_stat: false,
file_extension: self.file_extension.to_owned(),
target_partitions,
- table_partition_cols: vec![],
+ table_partition_cols: self.table_partition_cols.clone(),
}
}
}
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 5a1659d..d2f5c04 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -19,8 +19,7 @@
use crate::datasource::{
empty::EmptyTable,
- file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
- listing::{ListingOptions, ListingTable, ListingTableConfig},
+ listing::{ListingTable, ListingTableConfig},
MemTable, TableProvider,
};
use crate::error::{DataFusionError, Result};
@@ -256,6 +255,7 @@ impl LogicalPlanBuilder {
pub async fn scan_parquet(
object_store: Arc<dyn ObjectStore>,
path: impl Into<String>,
+ options: ParquetReadOptions<'_>,
projection: Option<Vec<usize>>,
target_partitions: usize,
) -> Result<Self> {
@@ -263,6 +263,7 @@ impl LogicalPlanBuilder {
Self::scan_parquet_with_name(
object_store,
path.clone(),
+ options,
projection,
target_partitions,
path,
@@ -274,21 +275,12 @@ impl LogicalPlanBuilder {
pub async fn scan_parquet_with_name(
object_store: Arc<dyn ObjectStore>,
path: impl Into<String>,
+ options: ParquetReadOptions<'_>,
projection: Option<Vec<usize>>,
target_partitions: usize,
table_name: impl Into<String>,
) -> Result<Self> {
- // TODO remove hard coded enable_pruning
- let file_format = ParquetFormat::default().with_enable_pruning(true);
-
- let listing_options = ListingOptions {
- format: Arc::new(file_format),
- collect_stat: true,
- file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
- target_partitions,
- table_partition_cols: vec![],
- };
-
+ let listing_options = options.to_listing_options(target_partitions);
let path: String = path.into();
// with parquet we resolve the schema in all cases
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 3acb69e..eeb06d5 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -184,6 +184,8 @@ pub struct CreateExternalTable {
pub file_type: FileType,
/// Whether the CSV file contains a header
pub has_header: bool,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
}
/// Creates a schema.
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index c3bf7f9..9791b2e 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -487,8 +487,12 @@ mod tests {
// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
- ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
- .await?;
+ ctx.register_csv(
+ "part0",
+ &format!("{}/part-0.csv", out_dir),
+ csv_read_option.clone(),
+ )
+ .await?;
ctx.register_csv("allparts", &out_dir, csv_read_option)
.await?;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 6a0a619..310c5e7 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -595,7 +595,7 @@ mod tests {
use super::*;
use crate::execution::options::CsvReadOptions;
- use crate::prelude::{SessionConfig, SessionContext};
+ use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::Float32Array;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
@@ -1331,15 +1331,32 @@ mod tests {
let ctx = SessionContext::new();
// register each partition as well as the top level dir
- ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
- .await?;
- ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
- .await?;
- ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
- .await?;
- ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
+ ctx.register_parquet(
+ "part0",
+ &format!("{}/part-0.parquet", out_dir),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ ctx.register_parquet(
+ "part1",
+ &format!("{}/part-1.parquet", out_dir),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ ctx.register_parquet(
+ "part2",
+ &format!("{}/part-2.parquet", out_dir),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ ctx.register_parquet(
+ "part3",
+ &format!("{}/part-3.parquet", out_dir),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
.await?;
- ctx.register_parquet("allparts", &out_dir).await?;
let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 7504dd4..4dbaca2 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1586,7 +1586,7 @@ mod tests {
let logical_plan = LogicalPlanBuilder::scan_csv(
Arc::new(LocalFileSystem {}),
&path,
- options,
+ options.clone(),
None,
1,
)
@@ -1686,7 +1686,7 @@ mod tests {
let logical_plan = LogicalPlanBuilder::scan_csv(
Arc::new(LocalFileSystem {}),
&path,
- options,
+ options.clone(),
None,
1,
)
@@ -1708,7 +1708,7 @@ mod tests {
let logical_plan = LogicalPlanBuilder::scan_csv(
Arc::new(LocalFileSystem {}),
&path,
- options,
+ options.clone(),
None,
1,
)
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index e40693e..cd78209 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -27,8 +27,9 @@
pub use crate::dataframe::DataFrame;
pub use crate::execution::context::{SessionConfig, SessionContext};
-pub use crate::execution::options::AvroReadOptions;
-pub use crate::execution::options::{CsvReadOptions, NdJsonReadOptions};
+pub use crate::execution::options::{
+ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
+};
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest, in_list,
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 6ad105f..566e91c 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -78,6 +78,8 @@ pub struct CreateExternalTable {
pub has_header: bool,
/// Path to file
pub location: String,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
}
/// DataFusion Statement representations.
@@ -192,6 +194,35 @@ impl<'a> DFParser<'a> {
}
}
+ fn parse_partitions(&mut self) -> Result<Vec<String>, ParserError> {
+ let mut partitions: Vec<String> = vec![];
+ if !self.parser.consume_token(&Token::LParen)
+ || self.parser.consume_token(&Token::RParen)
+ {
+ return Ok(partitions);
+ }
+
+ loop {
+ if let Token::Word(_) = self.parser.peek_token() {
+ let identifier = self.parser.parse_identifier()?;
+ partitions.push(identifier.to_string());
+ } else {
+ return self.expected("partition name", self.parser.peek_token());
+ }
+ let comma = self.parser.consume_token(&Token::Comma);
+ if self.parser.consume_token(&Token::RParen) {
+ // allow a trailing comma, even though it's not in standard
+ break;
+ } else if !comma {
+ return self.expected(
+ "',' or ')' after partition definition",
+ self.parser.peek_token(),
+ );
+ }
+ }
+ Ok(partitions)
+ }
+
// This is a copy of the equivalent implementation in sqlparser.
fn parse_columns(
&mut self,
@@ -277,6 +308,12 @@ impl<'a> DFParser<'a> {
let has_header = self.parse_csv_has_header();
+ let table_partition_cols = if self.parse_has_partition() {
+ self.parse_partitions()?
+ } else {
+ vec![]
+ };
+
self.parser.expect_keyword(Keyword::LOCATION)?;
let location = self.parser.parse_literal_string()?;
@@ -286,6 +323,7 @@ impl<'a> DFParser<'a> {
file_type,
has_header,
location,
+ table_partition_cols,
};
Ok(Statement::CreateExternalTable(create))
}
@@ -314,6 +352,11 @@ impl<'a> DFParser<'a> {
& self.consume_token(&Token::make_keyword("HEADER"))
& self.consume_token(&Token::make_keyword("ROW"))
}
+
+ fn parse_has_partition(&mut self) -> bool {
+ self.consume_token(&Token::make_keyword("PARTITIONED"))
+ & self.consume_token(&Token::make_keyword("BY"))
+ }
}
#[cfg(test)]
@@ -376,6 +419,20 @@ mod tests {
file_type: FileType::CSV,
has_header: false,
location: "foo.csv".into(),
+ table_partition_cols: vec![],
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // positive case: partitioned by
+ let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'";
+ let display = None;
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![make_column_def("c1", DataType::Int(display))],
+ file_type: FileType::CSV,
+ has_header: false,
+ location: "foo.csv".into(),
+ table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
});
expect_parse_ok(sql, expected)?;
@@ -391,6 +448,7 @@ mod tests {
file_type: FileType::CSV,
has_header: true,
location: "foo.csv".into(),
+ table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;
}
@@ -403,6 +461,7 @@ mod tests {
file_type: FileType::Parquet,
has_header: false,
location: "foo.parquet".into(),
+ table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;
@@ -414,6 +473,7 @@ mod tests {
file_type: FileType::Parquet,
has_header: false,
location: "foo.parquet".into(),
+ table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;
@@ -425,6 +485,7 @@ mod tests {
file_type: FileType::Avro,
has_header: false,
location: "foo.avro".into(),
+ table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;
@@ -433,6 +494,11 @@ mod tests {
"CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'";
expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV");
+ // Error cases: partition column does not support type
+ let sql =
+ "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
+ expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");
+
Ok(())
}
}
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 4409110..b59444e 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -308,6 +308,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
file_type,
has_header,
location,
+ table_partition_cols,
} = statement;
// semantic checks
@@ -333,6 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
location,
file_type,
has_header,
+ table_partition_cols,
}))
}
diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs
index 4301e65..d5392e9 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -37,7 +37,7 @@ use datafusion::{
accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
ExecutionPlanVisitor,
},
- prelude::{SessionConfig, SessionContext},
+ prelude::{ParquetReadOptions, SessionConfig, SessionContext},
scalar::ScalarValue,
};
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
@@ -482,7 +482,9 @@ impl ContextWithParquet {
// now, setup a the file as a data source and run a query against it
let ctx = SessionContext::with_config(config);
- ctx.register_parquet("t", &parquet_path).await.unwrap();
+ ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
+ .await
+ .unwrap();
let provider = ctx.deregister_table("t").unwrap().unwrap();
ctx.register_table("t", provider.clone()).unwrap();
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 724fffc..a788316 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -624,6 +624,7 @@ async fn register_alltypes_parquet(ctx: &SessionContext) {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
+ ParquetReadOptions::default(),
)
.await
.unwrap();
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 6dfb37e..7c1bb7c 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -52,9 +52,13 @@ async fn parquet_query() {
async fn parquet_single_nan_schema() {
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
- ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
- .await
- .unwrap();
+ ctx.register_parquet(
+ "single_nan",
+ &format!("{}/single_nan.parquet", testdata),
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
let sql = "SELECT mycol FROM single_nan";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
@@ -75,6 +79,7 @@ async fn parquet_list_columns() {
ctx.register_parquet(
"list_columns",
&format!("{}/list_columns.parquet", testdata),
+ ParquetReadOptions::default(),
)
.await
.unwrap();
@@ -214,7 +219,10 @@ async fn schema_merge_ignores_metadata() {
// (no errors)
let ctx = SessionContext::new();
let df = ctx
- .read_parquet(table_dir.to_str().unwrap().to_string())
+ .read_parquet(
+ table_dir.to_str().unwrap().to_string(),
+ ParquetReadOptions::default(),
+ )
.await
.unwrap();
let result = df.collect().await.unwrap();
diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md
index eb10f40..75ec0f6 100644
--- a/docs/source/user-guide/sql/ddl.md
+++ b/docs/source/user-guide/sql/ddl.md
@@ -55,6 +55,21 @@ WITH HEADER ROW
LOCATION '/path/to/aggregate_test_100.csv';
```
+If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning.
+
+```
+/mnt/nyctaxi/year=2022/month=01/tripdata.parquet
+/mnt/nyctaxi/year=2021/month=12/tripdata.parquet
+/mnt/nyctaxi/year=2021/month=11/tripdata.parquet
+```
+
+```sql
+CREATE EXTERNAL TABLE taxi
+STORED AS PARQUET
+PARTITIONED BY (year, month)
+LOCATION '/mnt/nyctaxi';
+```
+
## CREATE MEMORY TABLE
Memory table can be created with query.