You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/09/07 17:21:07 UTC

[arrow-datafusion] branch main updated: Support Configuring Parquet Column Specific Options via SQL Statement Options (#7466)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ec1c8b8931 Support Configuring Parquet Column Specific Options via SQL Statement Options (#7466)
ec1c8b8931 is described below

commit ec1c8b8931502160f50cd3fe78f785957e626c9f
Author: Devin D'Angelo <de...@gmail.com>
AuthorDate: Thu Sep 7 13:21:00 2023 -0400

    Support Configuring Parquet Column Specific Options via SQL Statement Options (#7466)
    
    * alt syntax for compression implemented
    
    * add unit test
    
    * exxpand copy.slt test
---
 datafusion/common/src/file_options/mod.rs          | 120 +++++++++++++++++++++
 .../common/src/file_options/parquet_writer.rs      |  85 +++++++++++----
 datafusion/common/src/file_options/parse_utils.rs  |  13 +++
 datafusion/sqllogictest/test_files/copy.slt        |  10 +-
 4 files changed, 205 insertions(+), 23 deletions(-)

diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs
index 29ee73f80f..ab792bdca7 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -377,6 +377,126 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_writeroptions_parquet_column_specific() -> Result<()> {
+        let mut option_map: HashMap<String, String> = HashMap::new();
+
+        option_map.insert("bloom_filter_enabled::col1".to_owned(), "true".to_owned());
+        option_map.insert(
+            "bloom_filter_enabled::col2.nested".to_owned(),
+            "true".to_owned(),
+        );
+        option_map.insert("encoding::col1".to_owned(), "plain".to_owned());
+        option_map.insert("encoding::col2.nested".to_owned(), "rle".to_owned());
+        option_map.insert("dictionary_enabled::col1".to_owned(), "true".to_owned());
+        option_map.insert(
+            "dictionary_enabled::col2.nested".to_owned(),
+            "true".to_owned(),
+        );
+        option_map.insert("compression::col1".to_owned(), "zstd(4)".to_owned());
+        option_map.insert("compression::col2.nested".to_owned(), "zstd(10)".to_owned());
+        option_map.insert("statistics_enabled::col1".to_owned(), "page".to_owned());
+        option_map.insert(
+            "statistics_enabled::col2.nested".to_owned(),
+            "none".to_owned(),
+        );
+        option_map.insert("bloom_filter_fpp::col1".to_owned(), "0.123".to_owned());
+        option_map.insert(
+            "bloom_filter_fpp::col2.nested".to_owned(),
+            "0.456".to_owned(),
+        );
+        option_map.insert("bloom_filter_ndv::col1".to_owned(), "123".to_owned());
+        option_map.insert("bloom_filter_ndv::col2.nested".to_owned(), "456".to_owned());
+
+        let options = StatementOptions::from(&option_map);
+        let config = ConfigOptions::new();
+
+        let parquet_options = ParquetWriterOptions::try_from((&config, &options))?;
+        let properties = parquet_options.writer_options();
+
+        let col1 = ColumnPath::from(vec!["col1".to_owned()]);
+        let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);
+
+        // Verify the expected options propagated down to parquet crate WriterProperties struct
+
+        properties
+            .bloom_filter_properties(&col1)
+            .expect("expected bloom filter enabled for col1");
+
+        properties
+            .bloom_filter_properties(&col2_nested)
+            .expect("expected bloom filter enabled cor col2_nested");
+
+        assert_eq!(
+            properties.encoding(&col1).expect("expected encoding"),
+            Encoding::PLAIN
+        );
+
+        assert_eq!(
+            properties
+                .encoding(&col2_nested)
+                .expect("expected encoding"),
+            Encoding::RLE
+        );
+
+        assert!(properties.dictionary_enabled(&col1));
+        assert!(properties.dictionary_enabled(&col2_nested));
+
+        assert_eq!(
+            properties.compression(&col1),
+            Compression::ZSTD(ZstdLevel::try_new(4_i32)?)
+        );
+
+        assert_eq!(
+            properties.compression(&col2_nested),
+            Compression::ZSTD(ZstdLevel::try_new(10_i32)?)
+        );
+
+        assert_eq!(
+            properties.statistics_enabled(&col1),
+            EnabledStatistics::Page
+        );
+
+        assert_eq!(
+            properties.statistics_enabled(&col2_nested),
+            EnabledStatistics::None
+        );
+
+        assert_eq!(
+            properties
+                .bloom_filter_properties(&col1)
+                .expect("expected bloom properties!")
+                .fpp,
+            0.123
+        );
+
+        assert_eq!(
+            properties
+                .bloom_filter_properties(&col2_nested)
+                .expect("expected bloom properties!")
+                .fpp,
+            0.456
+        );
+
+        assert_eq!(
+            properties
+                .bloom_filter_properties(&col1)
+                .expect("expected bloom properties!")
+                .ndv,
+            123
+        );
+
+        assert_eq!(
+            properties
+                .bloom_filter_properties(&col2_nested)
+                .expect("expected bloom properties!")
+                .ndv,
+            456
+        );
+
+        Ok(())
+    }
+
     #[test]
     fn test_writeroptions_csv_from_statement_options() -> Result<()> {
         let mut option_map: HashMap<String, String> = HashMap::new();
diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs
index fed773f29e..5cbbdf365e 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -28,7 +28,7 @@ use crate::{
     DataFusionError, Result,
 };
 
-use super::StatementOptions;
+use super::{parse_utils::split_option_and_column_path, StatementOptions};
 
 /// Options for writing parquet files
 #[derive(Clone, Debug)]
@@ -121,6 +121,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
         let statement_options = configs_and_statement_options.1;
         let mut builder = default_builder(configs)?;
         for (option, value) in &statement_options.options {
+            let (option, col_path) = split_option_and_column_path(option);
             builder = match option.to_lowercase().as_str(){
                 "max_row_group_size" => builder
                     .set_max_row_group_size(value.parse()
@@ -144,27 +145,67 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
                 "data_page_row_count_limit" => builder
                     .set_data_page_row_count_limit(value.parse()
                     .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
-                "bloom_filter_enabled" => builder
-                    .set_bloom_filter_enabled(value.parse()
-                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
-                "encoding" => builder
-                    .set_encoding(parse_encoding_string(value)?),
-                "dictionary_enabled" => builder
-                    .set_dictionary_enabled(value.parse()
-                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
-                "compression" => builder
-                    .set_compression(parse_compression_string(value)?),
-                "statistics_enabled" => builder
-                    .set_statistics_enabled(parse_statistics_string(value)?),
-                "max_statistics_size" => builder
-                    .set_max_statistics_size(value.parse()
-                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
-                "bloom_filter_fpp" => builder
-                    .set_bloom_filter_fpp(value.parse()
-                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?),
-                "bloom_filter_ndv" => builder
-                    .set_bloom_filter_ndv(value.parse()
-                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?),
+                "bloom_filter_enabled" => {
+                    let parsed_value = value.parse()
+                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
+                    match col_path{
+                        Some(path) => builder.set_column_bloom_filter_enabled(path, parsed_value),
+                        None => builder.set_bloom_filter_enabled(parsed_value)
+                    }
+                },
+                "encoding" => {
+                    let parsed_encoding = parse_encoding_string(value)?;
+                    match col_path{
+                        Some(path) => builder.set_column_encoding(path, parsed_encoding),
+                        None => builder.set_encoding(parsed_encoding)
+                    }
+                },
+                "dictionary_enabled" => {
+                    let parsed_value = value.parse()
+                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
+                    match col_path{
+                        Some(path) => builder.set_column_dictionary_enabled(path, parsed_value),
+                        None => builder.set_dictionary_enabled(parsed_value)
+                    }
+                },
+                "compression" => {
+                    let parsed_compression = parse_compression_string(value)?;
+                    match col_path{
+                        Some(path) => builder.set_column_compression(path, parsed_compression),
+                        None => builder.set_compression(parsed_compression)
+                    }
+                },
+                "statistics_enabled" => {
+                    let parsed_value = parse_statistics_string(value)?;
+                    match col_path{
+                        Some(path) => builder.set_column_statistics_enabled(path, parsed_value),
+                        None => builder.set_statistics_enabled(parsed_value)
+                    }
+                },
+                "max_statistics_size" => {
+                    let parsed_value = value.parse()
+                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?;
+                    match col_path{
+                        Some(path) => builder.set_column_max_statistics_size(path, parsed_value),
+                        None => builder.set_max_statistics_size(parsed_value)
+                    }
+                },
+                "bloom_filter_fpp" => {
+                    let parsed_value = value.parse()
+                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?;
+                    match col_path{
+                        Some(path) => builder.set_column_bloom_filter_fpp(path, parsed_value),
+                        None => builder.set_bloom_filter_fpp(parsed_value)
+                    }
+                },
+                "bloom_filter_ndv" => {
+                    let parsed_value = value.parse()
+                    .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?;
+                    match col_path{
+                        Some(path) => builder.set_column_bloom_filter_ndv(path, parsed_value),
+                        None => builder.set_bloom_filter_ndv(parsed_value)
+                    }
+                },
                 _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!")))
             }
         }
diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs
index da8d31436b..5e47d17817 100644
--- a/datafusion/common/src/file_options/parse_utils.rs
+++ b/datafusion/common/src/file_options/parse_utils.rs
@@ -20,6 +20,7 @@
 use parquet::{
     basic::{BrotliLevel, GzipLevel, ZstdLevel},
     file::properties::{EnabledStatistics, WriterVersion},
+    schema::types::ColumnPath,
 };
 
 use crate::{DataFusionError, Result};
@@ -181,3 +182,15 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatis
         ))),
     }
 }
+
+pub(crate) fn split_option_and_column_path(
+    str_setting: &str,
+) -> (String, Option<ColumnPath>) {
+    match str_setting.replace('\'', "").split_once("::") {
+        Some((s1, s2)) => {
+            let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect());
+            (s1.to_owned(), Some(col_path))
+        }
+        None => (str_setting.to_owned(), None),
+    }
+}
diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt
index f095552dad..3ade43b4e8 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -70,7 +70,9 @@ COPY source_table
 TO 'test_files/scratch/copy/table_with_options' 
 (format parquet,
 single_file_output false,
-compression 'snappy',
+compression snappy,
+'compression::col1' 'zstd(5)',
+'compression::col2' snappy,
 max_row_group_size 12345,
 data_pagesize_limit 1234,
 write_batch_size 1234,
@@ -80,9 +82,15 @@ created_by 'DF copy.slt',
 column_index_truncate_length 123,
 data_page_row_count_limit 1234,
 bloom_filter_enabled true,
+'bloom_filter_enabled::col1' false,
+'bloom_filter_fpp::col2' 0.456,
+'bloom_filter_ndv::col2' 456,
 encoding plain,
+'encoding::col1' DELTA_BINARY_PACKED,
+'dictionary_enabled::col2' true,
 dictionary_enabled false,
 statistics_enabled page,
+'statistics_enabled::col2' none,
 max_statistics_size 123,
 bloom_filter_fpp 0.001,
 bloom_filter_ndv 100