You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/02 11:20:06 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #2099: feat: 2061 create external table ddl table partition cols

alamb commented on a change in pull request #2099:
URL: https://github.com/apache/arrow-datafusion/pull/2099#discussion_r841063796



##########
File path: datafusion/core/src/execution/context.rs
##########
@@ -548,20 +560,19 @@ impl SessionContext {
 
     /// Registers a Parquet data source so that it can be referenced from SQL statements
     /// executed against this context.
-    pub async fn register_parquet(&self, name: &str, uri: &str) -> Result<()> {
-        let (target_partitions, enable_pruning) = {
+    pub async fn register_parquet(
+        &self,
+        name: &str,
+        uri: &str,
+        options: ParquetReadOptions<'_>,
+    ) -> Result<()> {
+        let (target_partitions, parquet_pruning) = {

Review comment:
       Maybe we should eventually move the `parquet_pruning` option out of `SessionContext` and into the `ParquetReadOptions` structure. As a follow on PR

##########
File path: datafusion/core/src/execution/options.rs
##########
@@ -115,7 +128,57 @@ impl<'a> CsvReadOptions<'a> {
             collect_stat: false,
             file_extension: self.file_extension.to_owned(),
             target_partitions,
+            table_partition_cols: self.table_partition_cols.clone(),
+        }
+    }
+}
+
+/// Parquet read options
+#[derive(Clone)]
+pub struct ParquetReadOptions<'a> {
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".parquet".
+    pub file_extension: &'a str,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
+    /// Should DataFusion parquet reader using the predicate to prune data, following execution::context::SessionConfig
+    pub parquet_pruning: bool,
+}
+
+impl<'a> Default for ParquetReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            file_extension: DEFAULT_PARQUET_EXTENSION,
             table_partition_cols: vec![],
+            parquet_pruning: ParquetFormat::default().enable_pruning(),
+        }
+    }
+}
+
+impl<'a> ParquetReadOptions<'a> {
+    /// Specify parquet_pruning
+    pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
+        self.parquet_pruning = parquet_pruning;
+        self
+    }
+
+    /// Specify table_partition_cols for partition pruning
+    pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
+    /// Helper to convert these user facing options to `ListingTable` options
+    pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {

Review comment:
       👍 

##########
File path: datafusion/core/src/sql/parser.rs
##########
@@ -192,6 +194,35 @@ impl<'a> DFParser<'a> {
         }
     }
 
+    fn parse_partitions(&mut self) -> Result<Vec<String>, ParserError> {
+        let mut partitions: Vec<String> = vec![];
+        if !self.parser.consume_token(&Token::LParen)
+            || self.parser.consume_token(&Token::RParen)
+        {
+            return Ok(partitions);
+        }
+
+        loop {

Review comment:
       Given this code to parse a comma separated list is duplicated in `parse_columns` below, perhaps we could refactor into a common function to reduce the replication  -- not needed for this PR though

##########
File path: datafusion/core/src/sql/parser.rs
##########
@@ -277,6 +308,12 @@ impl<'a> DFParser<'a> {
 
         let has_header = self.parse_csv_has_header();
 
+        let has_partition = self.parse_has_partition();
+        let mut table_partition_cols: Vec<String> = vec![];
+        if has_partition {
+            table_partition_cols = self.parse_partitions()?;
+        }

Review comment:
       ```suggestion
           let table_partition_cols = if self.parse_has_partition() {
             self.parse_partitions()?;
           } else {
             vec![]
           };
   ```
   
   If you wanted to avoid a `mut`

##########
File path: datafusion/core/src/execution/options.rs
##########
@@ -43,8 +47,10 @@ pub struct CsvReadOptions<'a> {
     /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
     pub schema_infer_max_records: usize,
     /// File extension; only files with this extension are selected for data input.
-    /// Defaults to ".csv".
+    /// Defaults to DEFAULT_CSV_EXTENSION.

Review comment:
       👍 

##########
File path: datafusion/core/src/logical_plan/builder.rs
##########
@@ -274,21 +275,12 @@ impl LogicalPlanBuilder {
     pub async fn scan_parquet_with_name(
         object_store: Arc<dyn ObjectStore>,
         path: impl Into<String>,
+        options: ParquetReadOptions<'_>,
         projection: Option<Vec<usize>>,
         target_partitions: usize,
         table_name: impl Into<String>,
     ) -> Result<Self> {
-        // TODO remove hard coded enable_pruning

Review comment:
       ❤️ 

##########
File path: docs/source/user-guide/sql/ddl.md
##########
@@ -55,6 +55,21 @@ WITH HEADER ROW
 LOCATION '/path/to/aggregate_test_100.csv';
 ```
 
+If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning.

Review comment:
       ❤️ 

##########
File path: datafusion/core/src/execution/options.rs
##########
@@ -115,7 +128,57 @@ impl<'a> CsvReadOptions<'a> {
             collect_stat: false,
             file_extension: self.file_extension.to_owned(),
             target_partitions,
+            table_partition_cols: self.table_partition_cols.clone(),
+        }
+    }
+}
+
+/// Parquet read options
+#[derive(Clone)]
+pub struct ParquetReadOptions<'a> {
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".parquet".
+    pub file_extension: &'a str,
+    /// Partition Columns
+    pub table_partition_cols: Vec<String>,
+    /// Should DataFusion parquet reader using the predicate to prune data, following execution::context::SessionConfig

Review comment:
       ```suggestion
       /// Should DataFusion parquet reader using the predicate to prune data, 
       /// overridden by value on execution::context::SessionConfig
   ```
   
   Maybe after this PR is merged, we can remove the option on `SessionConfig`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org