You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/12/07 13:33:49 UTC
(arrow-rs) branch master updated: Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 490c080e5b Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181)
490c080e5b is described below
commit 490c080e5ba7a50efc862da9508e6669900549ee
Author: Adam Gutglick <ad...@gmail.com>
AuthorDate: Thu Dec 7 15:33:42 2023 +0200
Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181)
* Issue fix and tests
* Cleanup tests
---
parquet/src/arrow/arrow_writer/mod.rs | 143 +++++++++++++++++++++++++++++++++-
parquet/src/column/writer/mod.rs | 29 +++----
2 files changed, 158 insertions(+), 14 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index ea7b1eee99..e6e95d5099 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -916,8 +916,9 @@ mod tests {
use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::file::metadata::ParquetMetaData;
+ use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
- use crate::file::properties::{ReaderProperties, WriterVersion};
+ use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
@@ -2738,4 +2739,144 @@ mod tests {
assert_eq!(index[0][0].len(), 1); // 1 page
assert_eq!(index[0][1].len(), 1); // 1 page
}
+
+ #[test]
+ fn test_disabled_statistics_with_page() {
+ let file_schema = Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Utf8, true),
+ ]);
+ let file_schema = Arc::new(file_schema);
+
+ let batch = RecordBatch::try_new(
+ file_schema.clone(),
+ vec![
+ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
+ Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
+ ],
+ )
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::None)
+ .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
+ .build();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+
+ let metadata = writer.close().unwrap();
+ assert_eq!(metadata.row_groups.len(), 1);
+ let row_group = &metadata.row_groups[0];
+ assert_eq!(row_group.columns.len(), 2);
+ // Column "a" has both offset and column index, as requested
+ assert!(row_group.columns[0].offset_index_offset.is_some());
+ assert!(row_group.columns[0].column_index_offset.is_some());
+ // Column "b" should only have offset index
+ assert!(row_group.columns[1].offset_index_offset.is_some());
+ assert!(row_group.columns[1].column_index_offset.is_none());
+
+ let options = ReadOptionsBuilder::new().with_page_index().build();
+ let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
+
+ let row_group = reader.get_row_group(0).unwrap();
+ let a_col = row_group.metadata().column(0);
+ let b_col = row_group.metadata().column(1);
+
+ // Column chunk of column "a" should have chunk level statistics
+ if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
+ let min = byte_array_stats.min();
+ let max = byte_array_stats.max();
+
+ assert_eq!(min.as_bytes(), &[b'a']);
+ assert_eq!(max.as_bytes(), &[b'd']);
+ } else {
+ panic!("expecting Statistics::ByteArray");
+ }
+
+ // The column chunk for column "b" shouldn't have statistics
+ assert!(b_col.statistics().is_none());
+
+ let offset_index = reader.metadata().offset_index().unwrap();
+ assert_eq!(offset_index.len(), 1); // 1 row group
+ assert_eq!(offset_index[0].len(), 2); // 2 columns
+
+ let column_index = reader.metadata().column_index().unwrap();
+ assert_eq!(column_index.len(), 1); // 1 row group
+ assert_eq!(column_index[0].len(), 2); // 2 columns
+
+ let a_idx = &column_index[0][0];
+ assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
+ let b_idx = &column_index[0][1];
+ assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
+ }
+
+ #[test]
+ fn test_disabled_statistics_with_chunk() {
+ let file_schema = Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Utf8, true),
+ ]);
+ let file_schema = Arc::new(file_schema);
+
+ let batch = RecordBatch::try_new(
+ file_schema.clone(),
+ vec![
+ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
+ Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
+ ],
+ )
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::None)
+ .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
+ .build();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+
+ let metadata = writer.close().unwrap();
+ assert_eq!(metadata.row_groups.len(), 1);
+ let row_group = &metadata.row_groups[0];
+ assert_eq!(row_group.columns.len(), 2);
+ // Column "a" should only have offset index
+ assert!(row_group.columns[0].offset_index_offset.is_some());
+ assert!(row_group.columns[0].column_index_offset.is_none());
+ // Column "b" should only have offset index
+ assert!(row_group.columns[1].offset_index_offset.is_some());
+ assert!(row_group.columns[1].column_index_offset.is_none());
+
+ let options = ReadOptionsBuilder::new().with_page_index().build();
+ let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
+
+ let row_group = reader.get_row_group(0).unwrap();
+ let a_col = row_group.metadata().column(0);
+ let b_col = row_group.metadata().column(1);
+
+ // Column chunk of column "a" should have chunk level statistics
+ if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
+ let min = byte_array_stats.min();
+ let max = byte_array_stats.max();
+
+ assert_eq!(min.as_bytes(), &[b'a']);
+ assert_eq!(max.as_bytes(), &[b'd']);
+ } else {
+ panic!("expecting Statistics::ByteArray");
+ }
+
+ // The column chunk for column "b" shouldn't have statistics
+ assert!(b_col.statistics().is_none());
+
+ let column_index = reader.metadata().column_index().unwrap();
+ assert_eq!(column_index.len(), 1); // 1 row group
+ assert_eq!(column_index[0].len(), 2); // 2 columns
+
+ let a_idx = &column_index[0][0];
+ assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
+ let b_idx = &column_index[0][1];
+ assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
+ }
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 5dd7747c6f..531af4bd46 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -764,19 +764,22 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
- let page_statistics = match (values_data.min_value, values_data.max_value) {
- (Some(min), Some(max)) => {
- update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
- update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
- Some(ValueStatistics::new(
- Some(min),
- Some(max),
- None,
- self.page_metrics.num_page_nulls,
- false,
- ))
- }
- _ => None,
+ let page_statistics = if let (Some(min), Some(max)) =
+ (values_data.min_value, values_data.max_value)
+ {
+ // Update chunk level statistics
+ update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
+ update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
+
+ (self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new(
+ Some(min),
+ Some(max),
+ None,
+ self.page_metrics.num_page_nulls,
+ false,
+ ))
+ } else {
+ None
};
// update column and offset index