You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/02/19 07:32:33 UTC
(arrow-datafusion) branch main updated: Support Copy To Partitioned Files (#9240)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b2a04519da Support Copy To Partitioned Files (#9240)
b2a04519da is described below
commit b2a04519da97c2ff81789ef41dd652870794a73a
Author: Devin D'Angelo <de...@gmail.com>
AuthorDate: Mon Feb 19 02:32:27 2024 -0500
Support Copy To Partitioned Files (#9240)
* support copy to partitioned files
* remove print statements
* fmt
* fix tests and use err macro
* cargo doc fix
* add partition directory specific test
* try to support columns with single quotes in name
---
datafusion/common/src/file_options/mod.rs | 25 +++++++
datafusion/core/src/dataframe/mod.rs | 12 ++++
datafusion/core/src/dataframe/parquet.rs | 1 +
.../core/src/datasource/file_format/write/demux.rs | 28 +++++---
datafusion/core/src/physical_planner.rs | 10 ++-
datafusion/expr/src/logical_plan/builder.rs | 2 +
datafusion/expr/src/logical_plan/dml.rs | 2 +
datafusion/expr/src/logical_plan/plan.rs | 4 +-
datafusion/proto/src/logical_plan/mod.rs | 2 +
.../proto/tests/cases/roundtrip_logical_plan.rs | 4 ++
datafusion/sql/src/statement.rs | 2 +
datafusion/sqllogictest/test_files/copy.slt | 84 ++++++++++++++++++++++
12 files changed, 163 insertions(+), 13 deletions(-)
diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs
index 1d661b17eb..3a48f188fb 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -97,6 +97,31 @@ impl StatementOptions {
maybe_option.map(|(_, v)| v)
}
+ /// Finds partition_by option if exists and parses into a `Vec<String>`.
+ /// If option doesn't exist, returns empty `vec![]`.
+ /// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']`
+ pub fn take_partition_by(&mut self) -> Vec<String> {
+ let partition_by = self.take_str_option("partition_by");
+ match partition_by {
+ Some(part_cols) => {
+ let dequoted = part_cols
+ .chars()
+ .enumerate()
+ .filter(|(idx, c)| {
+ !((*idx == 0 || *idx == part_cols.len() - 1)
+ && (*c == '\'' || *c == '"'))
+ })
+ .map(|(_idx, c)| c)
+ .collect::<String>();
+ dequoted
+ .split(',')
+ .map(|s| s.trim().replace("''", "'"))
+ .collect::<Vec<_>>()
+ }
+ None => vec![],
+ }
+ }
+
/// Infers the file_type given a target and arbitrary options.
/// If the options contain an explicit "format" option, that will be used.
/// Otherwise, attempt to infer file_type from the extension of target.
diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs
index 237f14d2c0..81247908df 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions {
/// Allows compression of CSV and JSON.
/// Not supported for parquet.
compression: CompressionTypeVariant,
+ /// Sets which columns should be used for hive-style partitioned writes by name.
+ /// Can be set to empty vec![] for non-partitioned writes.
+ partition_by: Vec<String>,
}
impl DataFrameWriteOptions {
@@ -82,6 +85,7 @@ impl DataFrameWriteOptions {
overwrite: false,
single_file_output: false,
compression: CompressionTypeVariant::UNCOMPRESSED,
+ partition_by: vec![],
}
}
/// Set the overwrite option to true or false
@@ -101,6 +105,12 @@ impl DataFrameWriteOptions {
self.compression = compression;
self
}
+
+ /// Sets the partition_by columns for output partitioning
+ pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
+ self.partition_by = partition_by;
+ self
+ }
}
impl Default for DataFrameWriteOptions {
@@ -1176,6 +1186,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::CSV,
+ options.partition_by,
copy_options,
)?
.build()?;
@@ -1219,6 +1230,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::JSON,
+ options.partition_by,
copy_options,
)?
.build()?;
diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs
index 00a0e780d5..184d3c8cb2 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -73,6 +73,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::PARQUET,
+ options.partition_by,
copy_options,
)?
.build()?;
diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs
index 94d915827e..1f7c243e98 100644
--- a/datafusion/core/src/datasource/file_format/write/demux.rs
+++ b/datafusion/core/src/datasource/file_format/write/demux.rs
@@ -32,7 +32,7 @@ use arrow_array::cast::AsArray;
use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
-use datafusion_common::DataFusionError;
+use datafusion_common::{exec_datafusion_err, DataFusionError};
use datafusion_execution::TaskContext;
@@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>(
) -> Result<Vec<Vec<&'a str>>> {
let mut all_partition_values = vec![];
- for (col, dtype) in partition_by.iter() {
+ // For the purposes of writing partitioned data, we can rely on schema inference
+ // to determine the type of the partition cols in order to provide a more ergonomic
+ // UI which does not require specifying DataTypes manually. So, we ignore the
+ // DataType within the partition_by array and infer the correct type from the
+ // batch schema instead.
+ let schema = rb.schema();
+ for (col, _) in partition_by.iter() {
let mut partition_values = vec![];
- let col_array =
- rb.column_by_name(col)
- .ok_or(DataFusionError::Execution(format!(
- "PartitionBy Column {} does not exist in source data!",
- col
- )))?;
+
+ let dtype = schema.field_with_name(col)?.data_type();
+ let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!(
+ "PartitionBy Column {} does not exist in source data! Got schema {schema}.",
+ col
+ ))?;
match dtype {
DataType::Utf8 => {
@@ -339,12 +345,12 @@ fn compute_partition_keys_by_row<'a>(
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>()
- .ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}",
- dtype)))?;
+ .ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}",
+ dtype))?;
for val in array.values() {
partition_values.push(
- val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))?
+ val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?
);
}
},
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 463d0cde82..dabf0a91b2 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner {
output_url,
file_format,
copy_options,
+ partition_by,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
let parsed_url = ListingTableUrl::parse(output_url)?;
@@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
};
+ // Note: the DataType passed here is ignored for the purposes of writing and inferred instead
+ // from the schema of the RecordBatch being written. This allows COPY statements to specify only
+ // the column name rather than column name + explicit data type.
+ let table_partition_cols = partition_by.iter()
+ .map(|s| (s.to_string(), arrow_schema::DataType::Null))
+ .collect::<Vec<_>>();
+
// Set file sink related options
let config = FileSinkConfig {
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
output_schema: Arc::new(schema),
- table_partition_cols: vec![],
+ table_partition_cols,
overwrite: false,
file_type_writer_options
};
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 39df96d61f..0662396f61 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -263,12 +263,14 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
output_url: String,
file_format: FileType,
+ partition_by: Vec<String>,
copy_options: CopyOptions,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
file_format,
+ partition_by,
copy_options,
})))
}
diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs
index 794c649989..a55781eda6 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -36,6 +36,8 @@ pub struct CopyTo {
pub output_url: String,
/// The file format to output (explicitly defined or inferred from file extension)
pub file_format: FileType,
+ /// Detmines which, if any, columns should be used for hive-style partitioned writes
+ pub partition_by: Vec<String>,
/// Arbitrary options as tuples
pub copy_options: CopyOptions,
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index ba768cf3c6..aa5dff25ef 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -615,12 +615,13 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
+ partition_by,
copy_options,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),
-
+ partition_by: partition_by.clone(),
copy_options: copy_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) => {
@@ -1551,6 +1552,7 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
+ partition_by: _,
copy_options,
}) => {
let op_str = match copy_options {
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 7a6dab85de..aaaf165e32 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode {
input: Arc::new(input),
output_url: copy.output_url.clone(),
file_format: FileType::from_str(©.file_type)?,
+ partition_by: vec![],
copy_options,
},
))
@@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
+ partition_by: _,
}) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
input,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 68a318b5a6..81f5997547 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -324,6 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
+ partition_by: vec![],
copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)),
});
@@ -354,6 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.parquet".to_string(),
file_format: FileType::PARQUET,
+ partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
)),
@@ -402,6 +404,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
file_format: FileType::ARROW,
+ partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
))),
@@ -447,6 +450,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
+ partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV(
CsvWriterOptions::new(
writer_properties,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 47eca70ef3..bf15146a92 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -718,6 +718,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut statement_options = StatementOptions::new(options);
let file_format = statement_options.try_infer_file_type(&statement.target)?;
+ let partition_by = statement_options.take_partition_by();
let copy_options = CopyOptions::SQLOptions(statement_options);
@@ -725,6 +726,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: Arc::new(input),
output_url: statement.target,
file_format,
+ partition_by,
copy_options,
}))
}
diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt
index dd2ce16a63..51b46d710b 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -25,6 +25,90 @@ COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compressi
----
2
+# Copy to directory as partitioned files
+query IT
+COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, compression 'zstd(10)', partition_by 'col2');
+----
+2
+
+# validate multiple partitioned parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet STORED AS PARQUET
+LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2);
+
+query I?
+select * from validate_partitioned_parquet order by col1, col2;
+----
+1 Foo
+2 Bar
+
+# validate partition paths were actually generated
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet_bar STORED AS PARQUET
+LOCATION 'test_files/scratch/copy/partitioned_table1/col2=Bar';
+
+query I
+select * from validate_partitioned_parquet_bar order by col1;
+----
+2
+
+# Copy to directory as partitioned files
+query ITT
+COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/'
+(format parquet, compression 'zstd(10)', partition_by 'column2, column3');
+----
+3
+
+# validate multiple partitioned parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET
+LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3);
+
+query I??
+select * from validate_partitioned_parquet2 order by column1,column2,column3;
+----
+1 a x
+2 b y
+3 c z
+
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet_a_x STORED AS PARQUET
+LOCATION 'test_files/scratch/copy/partitioned_table2/column2=a/column3=x';
+
+query I
+select * from validate_partitioned_parquet_a_x order by column1;
+----
+1
+
+statement ok
+create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar);
+
+query TTT
+insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc')
+----
+3
+
+query T
+select "'test'" from test
+----
+a
+b
+c
+
+# Note to place a single ' inside of a literal string escape by putting two ''
+query TTT
+copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by '''test2'',''test3''')
+----
+3
+
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV
+LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'");
+
+# This triggers a panic (index out of bounds)
+#query
+#select * from validate_partitioned_escape_quote;
+
query TT
EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compression 'zstd(10)');
----