You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/09/07 17:21:07 UTC
[arrow-datafusion] branch main updated: Support Configuring Parquet Column Specific Options via SQL Statement Options (#7466)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ec1c8b8931 Support Configuring Parquet Column Specific Options via SQL Statement Options (#7466)
ec1c8b8931 is described below
commit ec1c8b8931502160f50cd3fe78f785957e626c9f
Author: Devin D'Angelo <de...@gmail.com>
AuthorDate: Thu Sep 7 13:21:00 2023 -0400
Support Configuring Parquet Column Specific Options via SQL Statement Options (#7466)
* alt syntax for compression implemented
* add unit test
* exxpand copy.slt test
---
datafusion/common/src/file_options/mod.rs | 120 +++++++++++++++++++++
.../common/src/file_options/parquet_writer.rs | 85 +++++++++++----
datafusion/common/src/file_options/parse_utils.rs | 13 +++
datafusion/sqllogictest/test_files/copy.slt | 10 +-
4 files changed, 205 insertions(+), 23 deletions(-)
diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs
index 29ee73f80f..ab792bdca7 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -377,6 +377,126 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_writeroptions_parquet_column_specific() -> Result<()> {
+ let mut option_map: HashMap<String, String> = HashMap::new();
+
+ option_map.insert("bloom_filter_enabled::col1".to_owned(), "true".to_owned());
+ option_map.insert(
+ "bloom_filter_enabled::col2.nested".to_owned(),
+ "true".to_owned(),
+ );
+ option_map.insert("encoding::col1".to_owned(), "plain".to_owned());
+ option_map.insert("encoding::col2.nested".to_owned(), "rle".to_owned());
+ option_map.insert("dictionary_enabled::col1".to_owned(), "true".to_owned());
+ option_map.insert(
+ "dictionary_enabled::col2.nested".to_owned(),
+ "true".to_owned(),
+ );
+ option_map.insert("compression::col1".to_owned(), "zstd(4)".to_owned());
+ option_map.insert("compression::col2.nested".to_owned(), "zstd(10)".to_owned());
+ option_map.insert("statistics_enabled::col1".to_owned(), "page".to_owned());
+ option_map.insert(
+ "statistics_enabled::col2.nested".to_owned(),
+ "none".to_owned(),
+ );
+ option_map.insert("bloom_filter_fpp::col1".to_owned(), "0.123".to_owned());
+ option_map.insert(
+ "bloom_filter_fpp::col2.nested".to_owned(),
+ "0.456".to_owned(),
+ );
+ option_map.insert("bloom_filter_ndv::col1".to_owned(), "123".to_owned());
+ option_map.insert("bloom_filter_ndv::col2.nested".to_owned(), "456".to_owned());
+
+ let options = StatementOptions::from(&option_map);
+ let config = ConfigOptions::new();
+
+ let parquet_options = ParquetWriterOptions::try_from((&config, &options))?;
+ let properties = parquet_options.writer_options();
+
+ let col1 = ColumnPath::from(vec!["col1".to_owned()]);
+ let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);
+
+ // Verify the expected options propagated down to parquet crate WriterProperties struct
+
+ properties
+ .bloom_filter_properties(&col1)
+ .expect("expected bloom filter enabled for col1");
+
+ properties
+ .bloom_filter_properties(&col2_nested)
+ .expect("expected bloom filter enabled cor col2_nested");
+
+ assert_eq!(
+ properties.encoding(&col1).expect("expected encoding"),
+ Encoding::PLAIN
+ );
+
+ assert_eq!(
+ properties
+ .encoding(&col2_nested)
+ .expect("expected encoding"),
+ Encoding::RLE
+ );
+
+ assert!(properties.dictionary_enabled(&col1));
+ assert!(properties.dictionary_enabled(&col2_nested));
+
+ assert_eq!(
+ properties.compression(&col1),
+ Compression::ZSTD(ZstdLevel::try_new(4_i32)?)
+ );
+
+ assert_eq!(
+ properties.compression(&col2_nested),
+ Compression::ZSTD(ZstdLevel::try_new(10_i32)?)
+ );
+
+ assert_eq!(
+ properties.statistics_enabled(&col1),
+ EnabledStatistics::Page
+ );
+
+ assert_eq!(
+ properties.statistics_enabled(&col2_nested),
+ EnabledStatistics::None
+ );
+
+ assert_eq!(
+ properties
+ .bloom_filter_properties(&col1)
+ .expect("expected bloom properties!")
+ .fpp,
+ 0.123
+ );
+
+ assert_eq!(
+ properties
+ .bloom_filter_properties(&col2_nested)
+ .expect("expected bloom properties!")
+ .fpp,
+ 0.456
+ );
+
+ assert_eq!(
+ properties
+ .bloom_filter_properties(&col1)
+ .expect("expected bloom properties!")
+ .ndv,
+ 123
+ );
+
+ assert_eq!(
+ properties
+ .bloom_filter_properties(&col2_nested)
+ .expect("expected bloom properties!")
+ .ndv,
+ 456
+ );
+
+ Ok(())
+ }
+
#[test]
fn test_writeroptions_csv_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs
index fed773f29e..5cbbdf365e 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -28,7 +28,7 @@ use crate::{
DataFusionError, Result,
};
-use super::StatementOptions;
+use super::{parse_utils::split_option_and_column_path, StatementOptions};
/// Options for writing parquet files
#[derive(Clone, Debug)]
@@ -121,6 +121,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
let statement_options = configs_and_statement_options.1;
let mut builder = default_builder(configs)?;
for (option, value) in &statement_options.options {
+ let (option, col_path) = split_option_and_column_path(option);
builder = match option.to_lowercase().as_str(){
"max_row_group_size" => builder
.set_max_row_group_size(value.parse()
@@ -144,27 +145,67 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
"data_page_row_count_limit" => builder
.set_data_page_row_count_limit(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
- "bloom_filter_enabled" => builder
- .set_bloom_filter_enabled(value.parse()
- .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
- "encoding" => builder
- .set_encoding(parse_encoding_string(value)?),
- "dictionary_enabled" => builder
- .set_dictionary_enabled(value.parse()
- .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
- "compression" => builder
- .set_compression(parse_compression_string(value)?),
- "statistics_enabled" => builder
- .set_statistics_enabled(parse_statistics_string(value)?),
- "max_statistics_size" => builder
- .set_max_statistics_size(value.parse()
- .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
- "bloom_filter_fpp" => builder
- .set_bloom_filter_fpp(value.parse()
- .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?),
- "bloom_filter_ndv" => builder
- .set_bloom_filter_ndv(value.parse()
- .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?),
+ "bloom_filter_enabled" => {
+ let parsed_value = value.parse()
+ .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
+ match col_path{
+ Some(path) => builder.set_column_bloom_filter_enabled(path, parsed_value),
+ None => builder.set_bloom_filter_enabled(parsed_value)
+ }
+ },
+ "encoding" => {
+ let parsed_encoding = parse_encoding_string(value)?;
+ match col_path{
+ Some(path) => builder.set_column_encoding(path, parsed_encoding),
+ None => builder.set_encoding(parsed_encoding)
+ }
+ },
+ "dictionary_enabled" => {
+ let parsed_value = value.parse()
+ .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
+ match col_path{
+ Some(path) => builder.set_column_dictionary_enabled(path, parsed_value),
+ None => builder.set_dictionary_enabled(parsed_value)
+ }
+ },
+ "compression" => {
+ let parsed_compression = parse_compression_string(value)?;
+ match col_path{
+ Some(path) => builder.set_column_compression(path, parsed_compression),
+ None => builder.set_compression(parsed_compression)
+ }
+ },
+ "statistics_enabled" => {
+ let parsed_value = parse_statistics_string(value)?;
+ match col_path{
+ Some(path) => builder.set_column_statistics_enabled(path, parsed_value),
+ None => builder.set_statistics_enabled(parsed_value)
+ }
+ },
+ "max_statistics_size" => {
+ let parsed_value = value.parse()
+ .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?;
+ match col_path{
+ Some(path) => builder.set_column_max_statistics_size(path, parsed_value),
+ None => builder.set_max_statistics_size(parsed_value)
+ }
+ },
+ "bloom_filter_fpp" => {
+ let parsed_value = value.parse()
+ .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?;
+ match col_path{
+ Some(path) => builder.set_column_bloom_filter_fpp(path, parsed_value),
+ None => builder.set_bloom_filter_fpp(parsed_value)
+ }
+ },
+ "bloom_filter_ndv" => {
+ let parsed_value = value.parse()
+ .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?;
+ match col_path{
+ Some(path) => builder.set_column_bloom_filter_ndv(path, parsed_value),
+ None => builder.set_bloom_filter_ndv(parsed_value)
+ }
+ },
_ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!")))
}
}
diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs
index da8d31436b..5e47d17817 100644
--- a/datafusion/common/src/file_options/parse_utils.rs
+++ b/datafusion/common/src/file_options/parse_utils.rs
@@ -20,6 +20,7 @@
use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
+ schema::types::ColumnPath,
};
use crate::{DataFusionError, Result};
@@ -181,3 +182,15 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatis
))),
}
}
+
+pub(crate) fn split_option_and_column_path(
+ str_setting: &str,
+) -> (String, Option<ColumnPath>) {
+ match str_setting.replace('\'', "").split_once("::") {
+ Some((s1, s2)) => {
+ let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect());
+ (s1.to_owned(), Some(col_path))
+ }
+ None => (str_setting.to_owned(), None),
+ }
+}
diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt
index f095552dad..3ade43b4e8 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -70,7 +70,9 @@ COPY source_table
TO 'test_files/scratch/copy/table_with_options'
(format parquet,
single_file_output false,
-compression 'snappy',
+compression snappy,
+'compression::col1' 'zstd(5)',
+'compression::col2' snappy,
max_row_group_size 12345,
data_pagesize_limit 1234,
write_batch_size 1234,
@@ -80,9 +82,15 @@ created_by 'DF copy.slt',
column_index_truncate_length 123,
data_page_row_count_limit 1234,
bloom_filter_enabled true,
+'bloom_filter_enabled::col1' false,
+'bloom_filter_fpp::col2' 0.456,
+'bloom_filter_ndv::col2' 456,
encoding plain,
+'encoding::col1' DELTA_BINARY_PACKED,
+'dictionary_enabled::col2' true,
dictionary_enabled false,
statistics_enabled page,
+'statistics_enabled::col2' none,
max_statistics_size 123,
bloom_filter_fpp 0.001,
bloom_filter_ndv 100