You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/12/07 13:33:49 UTC

(arrow-rs) branch master updated: Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 490c080e5b Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181)
490c080e5b is described below

commit 490c080e5ba7a50efc862da9508e6669900549ee
Author: Adam Gutglick <ad...@gmail.com>
AuthorDate: Thu Dec 7 15:33:42 2023 +0200

    Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181)
    
    * Issue fix and tests
    
    * Cleanup tests
---
 parquet/src/arrow/arrow_writer/mod.rs | 143 +++++++++++++++++++++++++++++++++-
 parquet/src/column/writer/mod.rs      |  29 +++----
 2 files changed, 158 insertions(+), 14 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index ea7b1eee99..e6e95d5099 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -916,8 +916,9 @@ mod tests {
     use crate::basic::Encoding;
     use crate::data_type::AsBytes;
     use crate::file::metadata::ParquetMetaData;
+    use crate::file::page_index::index::Index;
     use crate::file::page_index::index_reader::read_pages_locations;
-    use crate::file::properties::{ReaderProperties, WriterVersion};
+    use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
     use crate::file::serialized_reader::ReadOptionsBuilder;
     use crate::file::{
         reader::{FileReader, SerializedFileReader},
@@ -2738,4 +2739,144 @@ mod tests {
         assert_eq!(index[0][0].len(), 1); // 1 page
         assert_eq!(index[0][1].len(), 1); // 1 page
     }
+
+    #[test]
+    fn test_disabled_statistics_with_page() {
+        let file_schema = Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Utf8, true),
+        ]);
+        let file_schema = Arc::new(file_schema);
+
+        let batch = RecordBatch::try_new(
+            file_schema.clone(),
+            vec![
+                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
+                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
+            ],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_statistics_enabled(EnabledStatistics::None)
+            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
+            .build();
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+
+        let metadata = writer.close().unwrap();
+        assert_eq!(metadata.row_groups.len(), 1);
+        let row_group = &metadata.row_groups[0];
+        assert_eq!(row_group.columns.len(), 2);
+        // Column "a" has both offset and column index, as requested
+        assert!(row_group.columns[0].offset_index_offset.is_some());
+        assert!(row_group.columns[0].column_index_offset.is_some());
+        // Column "b" should only have offset index
+        assert!(row_group.columns[1].offset_index_offset.is_some());
+        assert!(row_group.columns[1].column_index_offset.is_none());
+
+        let options = ReadOptionsBuilder::new().with_page_index().build();
+        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
+
+        let row_group = reader.get_row_group(0).unwrap();
+        let a_col = row_group.metadata().column(0);
+        let b_col = row_group.metadata().column(1);
+
+        // Column chunk of column "a" should have chunk level statistics
+        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
+            let min = byte_array_stats.min();
+            let max = byte_array_stats.max();
+
+            assert_eq!(min.as_bytes(), &[b'a']);
+            assert_eq!(max.as_bytes(), &[b'd']);
+        } else {
+            panic!("expecting Statistics::ByteArray");
+        }
+
+        // The column chunk for column "b" shouldn't have statistics
+        assert!(b_col.statistics().is_none());
+
+        let offset_index = reader.metadata().offset_index().unwrap();
+        assert_eq!(offset_index.len(), 1); // 1 row group
+        assert_eq!(offset_index[0].len(), 2); // 2 columns
+
+        let column_index = reader.metadata().column_index().unwrap();
+        assert_eq!(column_index.len(), 1); // 1 row group
+        assert_eq!(column_index[0].len(), 2); // 2 columns
+
+        let a_idx = &column_index[0][0];
+        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
+        let b_idx = &column_index[0][1];
+        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
+    }
+
+    #[test]
+    fn test_disabled_statistics_with_chunk() {
+        let file_schema = Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Utf8, true),
+        ]);
+        let file_schema = Arc::new(file_schema);
+
+        let batch = RecordBatch::try_new(
+            file_schema.clone(),
+            vec![
+                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
+                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
+            ],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_statistics_enabled(EnabledStatistics::None)
+            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
+            .build();
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+
+        let metadata = writer.close().unwrap();
+        assert_eq!(metadata.row_groups.len(), 1);
+        let row_group = &metadata.row_groups[0];
+        assert_eq!(row_group.columns.len(), 2);
+        // Column "a" should only have offset index
+        assert!(row_group.columns[0].offset_index_offset.is_some());
+        assert!(row_group.columns[0].column_index_offset.is_none());
+        // Column "b" should only have offset index
+        assert!(row_group.columns[1].offset_index_offset.is_some());
+        assert!(row_group.columns[1].column_index_offset.is_none());
+
+        let options = ReadOptionsBuilder::new().with_page_index().build();
+        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
+
+        let row_group = reader.get_row_group(0).unwrap();
+        let a_col = row_group.metadata().column(0);
+        let b_col = row_group.metadata().column(1);
+
+        // Column chunk of column "a" should have chunk level statistics
+        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
+            let min = byte_array_stats.min();
+            let max = byte_array_stats.max();
+
+            assert_eq!(min.as_bytes(), &[b'a']);
+            assert_eq!(max.as_bytes(), &[b'd']);
+        } else {
+            panic!("expecting Statistics::ByteArray");
+        }
+
+        // The column chunk for column "b"  shouldn't have statistics
+        assert!(b_col.statistics().is_none());
+
+        let column_index = reader.metadata().column_index().unwrap();
+        assert_eq!(column_index.len(), 1); // 1 row group
+        assert_eq!(column_index[0].len(), 2); // 2 columns
+
+        let a_idx = &column_index[0][0];
+        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
+        let b_idx = &column_index[0][1];
+        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
+    }
 }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 5dd7747c6f..531af4bd46 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -764,19 +764,22 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
         self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
 
-        let page_statistics = match (values_data.min_value, values_data.max_value) {
-            (Some(min), Some(max)) => {
-                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
-                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
-                Some(ValueStatistics::new(
-                    Some(min),
-                    Some(max),
-                    None,
-                    self.page_metrics.num_page_nulls,
-                    false,
-                ))
-            }
-            _ => None,
+        let page_statistics = if let (Some(min), Some(max)) =
+            (values_data.min_value, values_data.max_value)
+        {
+            // Update chunk level statistics
+            update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
+            update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
+
+            (self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new(
+                Some(min),
+                Some(max),
+                None,
+                self.page_metrics.num_page_nulls,
+                false,
+            ))
+        } else {
+            None
         };
 
         // update column and offset index