You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/02/19 07:32:33 UTC

(arrow-datafusion) branch main updated: Support Copy To Partitioned Files (#9240)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b2a04519da Support Copy To Partitioned Files (#9240)
b2a04519da is described below

commit b2a04519da97c2ff81789ef41dd652870794a73a
Author: Devin D'Angelo <de...@gmail.com>
AuthorDate: Mon Feb 19 02:32:27 2024 -0500

    Support Copy To Partitioned Files (#9240)
    
    * support copy to partitioned files
    
    * remove print statements
    
    * fmt
    
    * fix tests and use err macro
    
    * cargo doc fix
    
    * add partition directory specific test
    
    * try to support columns with single quotes in name
---
 datafusion/common/src/file_options/mod.rs          | 25 +++++++
 datafusion/core/src/dataframe/mod.rs               | 12 ++++
 datafusion/core/src/dataframe/parquet.rs           |  1 +
 .../core/src/datasource/file_format/write/demux.rs | 28 +++++---
 datafusion/core/src/physical_planner.rs            | 10 ++-
 datafusion/expr/src/logical_plan/builder.rs        |  2 +
 datafusion/expr/src/logical_plan/dml.rs            |  2 +
 datafusion/expr/src/logical_plan/plan.rs           |  4 +-
 datafusion/proto/src/logical_plan/mod.rs           |  2 +
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  4 ++
 datafusion/sql/src/statement.rs                    |  2 +
 datafusion/sqllogictest/test_files/copy.slt        | 84 ++++++++++++++++++++++
 12 files changed, 163 insertions(+), 13 deletions(-)

diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs
index 1d661b17eb..3a48f188fb 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -97,6 +97,31 @@ impl StatementOptions {
         maybe_option.map(|(_, v)| v)
     }
 
+    /// Finds partition_by option if exists and parses into a `Vec<String>`.
+    /// If option doesn't exist, returns empty `vec![]`.
+    /// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']`
+    pub fn take_partition_by(&mut self) -> Vec<String> {
+        let partition_by = self.take_str_option("partition_by");
+        match partition_by {
+            Some(part_cols) => {
+                let dequoted = part_cols
+                    .chars()
+                    .enumerate()
+                    .filter(|(idx, c)| {
+                        !((*idx == 0 || *idx == part_cols.len() - 1)
+                            && (*c == '\'' || *c == '"'))
+                    })
+                    .map(|(_idx, c)| c)
+                    .collect::<String>();
+                dequoted
+                    .split(',')
+                    .map(|s| s.trim().replace("''", "'"))
+                    .collect::<Vec<_>>()
+            }
+            None => vec![],
+        }
+    }
+
     /// Infers the file_type given a target and arbitrary options.
     /// If the options contain an explicit "format" option, that will be used.
     /// Otherwise, attempt to infer file_type from the extension of target.
diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs
index 237f14d2c0..81247908df 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions {
     /// Allows compression of CSV and JSON.
     /// Not supported for parquet.
     compression: CompressionTypeVariant,
+    /// Sets which columns should be used for hive-style partitioned writes by name.
+    /// Can be set to empty vec![] for non-partitioned writes.
+    partition_by: Vec<String>,
 }
 
 impl DataFrameWriteOptions {
@@ -82,6 +85,7 @@ impl DataFrameWriteOptions {
             overwrite: false,
             single_file_output: false,
             compression: CompressionTypeVariant::UNCOMPRESSED,
+            partition_by: vec![],
         }
     }
     /// Set the overwrite option to true or false
@@ -101,6 +105,12 @@ impl DataFrameWriteOptions {
         self.compression = compression;
         self
     }
+
+    /// Sets the partition_by columns for output partitioning
+    pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
+        self.partition_by = partition_by;
+        self
+    }
 }
 
 impl Default for DataFrameWriteOptions {
@@ -1176,6 +1186,7 @@ impl DataFrame {
             self.plan,
             path.into(),
             FileType::CSV,
+            options.partition_by,
             copy_options,
         )?
         .build()?;
@@ -1219,6 +1230,7 @@ impl DataFrame {
             self.plan,
             path.into(),
             FileType::JSON,
+            options.partition_by,
             copy_options,
         )?
         .build()?;
diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs
index 00a0e780d5..184d3c8cb2 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -73,6 +73,7 @@ impl DataFrame {
             self.plan,
             path.into(),
             FileType::PARQUET,
+            options.partition_by,
             copy_options,
         )?
         .build()?;
diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs
index 94d915827e..1f7c243e98 100644
--- a/datafusion/core/src/datasource/file_format/write/demux.rs
+++ b/datafusion/core/src/datasource/file_format/write/demux.rs
@@ -32,7 +32,7 @@ use arrow_array::cast::AsArray;
 use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
 use arrow_schema::{DataType, Schema};
 use datafusion_common::cast::as_string_array;
-use datafusion_common::DataFusionError;
+use datafusion_common::{exec_datafusion_err, DataFusionError};
 
 use datafusion_execution::TaskContext;
 
@@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>(
 ) -> Result<Vec<Vec<&'a str>>> {
     let mut all_partition_values = vec![];
 
-    for (col, dtype) in partition_by.iter() {
+    // For the purposes of writing partitioned data, we can rely on schema inference
+    // to determine the type of the partition cols in order to provide a more ergonomic
+    // UI which does not require specifying DataTypes manually. So, we ignore the
+    // DataType within the partition_by array and infer the correct type from the
+    // batch schema instead.
+    let schema = rb.schema();
+    for (col, _) in partition_by.iter() {
         let mut partition_values = vec![];
-        let col_array =
-            rb.column_by_name(col)
-                .ok_or(DataFusionError::Execution(format!(
-                    "PartitionBy Column {} does not exist in source data!",
-                    col
-                )))?;
+
+        let dtype = schema.field_with_name(col)?.data_type();
+        let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!(
+            "PartitionBy Column {} does not exist in source data! Got schema {schema}.",
+            col
+        ))?;
 
         match dtype {
             DataType::Utf8 => {
@@ -339,12 +345,12 @@ fn compute_partition_keys_by_row<'a>(
                 downcast_dictionary_array!(
                     col_array =>  {
                         let array = col_array.downcast_dict::<StringArray>()
-                            .ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}",
-                            dtype)))?;
+                            .ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}",
+                            dtype))?;
 
                         for val in array.values() {
                             partition_values.push(
-                                val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))?
+                                val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?
                             );
                         }
                     },
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 463d0cde82..dabf0a91b2 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner {
                     output_url,
                     file_format,
                     copy_options,
+                    partition_by,
                 }) => {
                     let input_exec = self.create_initial_plan(input, session_state).await?;
                     let parsed_url = ListingTableUrl::parse(output_url)?;
@@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
                         CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
                     };
 
+                    // Note: the DataType passed here is ignored for the purposes of writing and inferred instead
+                    // from the schema of the RecordBatch being written. This allows COPY statements to specify only
+                    // the column name rather than column name + explicit data type.
+                    let table_partition_cols = partition_by.iter()
+                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
+                        .collect::<Vec<_>>();
+
                     // Set file sink related options
                     let config = FileSinkConfig {
                         object_store_url,
                         table_paths: vec![parsed_url],
                         file_groups: vec![],
                         output_schema: Arc::new(schema),
-                        table_partition_cols: vec![],
+                        table_partition_cols,
                         overwrite: false,
                         file_type_writer_options
                     };
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 39df96d61f..0662396f61 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -263,12 +263,14 @@ impl LogicalPlanBuilder {
         input: LogicalPlan,
         output_url: String,
         file_format: FileType,
+        partition_by: Vec<String>,
         copy_options: CopyOptions,
     ) -> Result<Self> {
         Ok(Self::from(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
             output_url,
             file_format,
+            partition_by,
             copy_options,
         })))
     }
diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs
index 794c649989..a55781eda6 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -36,6 +36,8 @@ pub struct CopyTo {
     pub output_url: String,
     /// The file format to output (explicitly defined or inferred from file extension)
     pub file_format: FileType,
+    /// Detmines which, if any, columns should be used for hive-style partitioned writes
+    pub partition_by: Vec<String>,
     /// Arbitrary options as tuples
     pub copy_options: CopyOptions,
 }
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index ba768cf3c6..aa5dff25ef 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -615,12 +615,13 @@ impl LogicalPlan {
                 input: _,
                 output_url,
                 file_format,
+                partition_by,
                 copy_options,
             }) => Ok(LogicalPlan::Copy(CopyTo {
                 input: Arc::new(inputs.swap_remove(0)),
                 output_url: output_url.clone(),
                 file_format: file_format.clone(),
-
+                partition_by: partition_by.clone(),
                 copy_options: copy_options.clone(),
             })),
             LogicalPlan::Values(Values { schema, .. }) => {
@@ -1551,6 +1552,7 @@ impl LogicalPlan {
                         input: _,
                         output_url,
                         file_format,
+                        partition_by: _,
                         copy_options,
                     }) => {
                         let op_str = match copy_options {
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 7a6dab85de..aaaf165e32 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         input: Arc::new(input),
                         output_url: copy.output_url.clone(),
                         file_format: FileType::from_str(&copy.file_type)?,
+                        partition_by: vec![],
                         copy_options,
                     },
                 ))
@@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 output_url,
                 file_format,
                 copy_options,
+                partition_by: _,
             }) => {
                 let input = protobuf::LogicalPlanNode::try_from_logical_plan(
                     input,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 68a318b5a6..81f5997547 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -324,6 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
         input: Arc::new(input),
         output_url: "test.csv".to_string(),
         file_format: FileType::CSV,
+        partition_by: vec![],
         copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)),
     });
 
@@ -354,6 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
         input: Arc::new(input),
         output_url: "test.parquet".to_string(),
         file_format: FileType::PARQUET,
+        partition_by: vec![],
         copy_options: CopyOptions::WriterOptions(Box::new(
             FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
         )),
@@ -402,6 +404,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
         input: Arc::new(input),
         output_url: "test.arrow".to_string(),
         file_format: FileType::ARROW,
+        partition_by: vec![],
         copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
             ArrowWriterOptions::new(),
         ))),
@@ -447,6 +450,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
         input: Arc::new(input),
         output_url: "test.csv".to_string(),
         file_format: FileType::CSV,
+        partition_by: vec![],
         copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV(
             CsvWriterOptions::new(
                 writer_properties,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 47eca70ef3..bf15146a92 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -718,6 +718,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         let mut statement_options = StatementOptions::new(options);
         let file_format = statement_options.try_infer_file_type(&statement.target)?;
+        let partition_by = statement_options.take_partition_by();
 
         let copy_options = CopyOptions::SQLOptions(statement_options);
 
@@ -725,6 +726,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             input: Arc::new(input),
             output_url: statement.target,
             file_format,
+            partition_by,
             copy_options,
         }))
     }
diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt
index dd2ce16a63..51b46d710b 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -25,6 +25,90 @@ COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compressi
 ----
 2
 
+# Copy to directory as partitioned files
+query IT
+COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, compression 'zstd(10)', partition_by 'col2');
+----
+2
+
+# validate multiple partitioned parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet STORED AS PARQUET 
+LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2);
+
+query I?
+select * from validate_partitioned_parquet order by col1, col2;
+----
+1 Foo
+2 Bar
+
+# validate partition paths were actually generated
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet_bar STORED AS PARQUET 
+LOCATION 'test_files/scratch/copy/partitioned_table1/col2=Bar';
+
+query I
+select * from validate_partitioned_parquet_bar order by col1;
+----
+2
+
+# Copy to directory as partitioned files
+query ITT
+COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' 
+(format parquet, compression 'zstd(10)', partition_by 'column2, column3');
+----
+3
+
+# validate multiple partitioned parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET 
+LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3);
+
+query I??
+select * from validate_partitioned_parquet2 order by column1,column2,column3;
+----
+1 a x
+2 b y
+3 c z
+
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet_a_x STORED AS PARQUET 
+LOCATION 'test_files/scratch/copy/partitioned_table2/column2=a/column3=x';
+
+query I
+select * from validate_partitioned_parquet_a_x order by column1;
+----
+1
+
+statement ok
+create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar); 
+
+query TTT
+insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc')
+----
+3
+
+query T
+select "'test'" from test
+----
+a
+b
+c
+
+# Note to place a single ' inside of a literal string escape by putting two ''
+query TTT
+copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by '''test2'',''test3''')
+----
+3
+
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV 
+LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'");
+
+# This triggers a panic (index out of bounds)
+#query
+#select * from validate_partitioned_escape_quote;
+
 query TT
 EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compression 'zstd(10)');
 ----