You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/29 17:49:48 UTC
[arrow-rs] branch master updated: Fix decimals min max statistics (#1621)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d8d6499d0 Fix decimals min max statistics (#1621)
d8d6499d0 is described below
commit d8d6499d081dbcf0dd626fa63aad24b4996d19d7
Author: Atef Sawaed <at...@gmail.com>
AuthorDate: Fri Apr 29 20:49:43 2022 +0300
Fix decimals min max statistics (#1621)
* Fix incorrect writing of min/max statistics
* Refactor
* Decimals Byte array comparison
* Add Decimals test
* Use slice instead of vector
* Fix build error
* Fix build error
* Coding Style
* More tests
* Refactor
* Improve code readability
Co-authored-by: Atef Sawaed <at...@microsoft.com>
---
parquet/src/column/writer.rs | 275 ++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 274 insertions(+), 1 deletion(-)
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 016d72e92..13ea85157 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -18,7 +18,7 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};
-use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
+use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
@@ -1006,6 +1006,41 @@ impl<T: DataType> ColumnWriterImpl<T> {
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}
+
+ match self.descr.converted_type() {
+ ConvertedType::UINT_8
+ | ConvertedType::UINT_16
+ | ConvertedType::UINT_32
+ | ConvertedType::UINT_64 => {
+ return a.as_u64().unwrap() > b.as_u64().unwrap();
+ }
+ _ => {}
+ };
+
+ if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() {
+ match self.descr.physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+ return compare_greater_byte_array_decimals(
+ a.as_bytes(),
+ b.as_bytes(),
+ );
+ }
+ _ => {}
+ };
+ }
+
+ if self.descr.converted_type() == ConvertedType::DECIMAL {
+ match self.descr.physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+ return compare_greater_byte_array_decimals(
+ a.as_bytes(),
+ b.as_bytes(),
+ );
+ }
+ _ => {}
+ };
+ };
+
a > b
}
}
@@ -1049,6 +1084,54 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
}
}
+/// Signed comparison of bytes arrays
+fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
+ let a_length = a.len();
+ let b_length = b.len();
+
+ if a_length == 0 || b_length == 0 {
+ return a_length > 0;
+ }
+
+ let first_a: u8 = a[0];
+ let first_b: u8 = b[0];
+
+ // We can short circuit for different signed numbers or
+ // for equal length bytes arrays that have different first bytes.
+ // The equality requirement is necessary for sign extension cases.
+ // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
+ if (0x80 & first_a) != (0x80 & first_b)
+ || (a_length == b_length && first_a != first_b)
+ {
+ return (first_a as i8) > (first_b as i8);
+ }
+
+ // When the lengths are unequal and the numbers are of the same
+ // sign we need to do comparison by sign extending the shorter
+ // value first, and once we get to equal sized arrays, lexicographical
+ // unsigned comparison of everything but the first byte is sufficient.
+
+ let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
+
+ if a_length != b_length {
+ let not_equal = if a_length > b_length {
+ let lead_length = a_length - b_length;
+ (&a[0..lead_length]).iter().any(|&x| x != extension)
+ } else {
+ let lead_length = b_length - a_length;
+ (&b[0..lead_length]).iter().any(|&x| x != extension)
+ };
+
+ if not_equal {
+ let negative_values: bool = (first_a as i8) < 0;
+ let a_longer: bool = a_length > b_length;
+ return if negative_values { !a_longer } else { a_longer };
+ }
+ }
+
+ (a[1..]) > (b[1..])
+}
+
#[cfg(test)]
mod tests {
use rand::distributions::uniform::SampleUniform;
@@ -1475,6 +1558,84 @@ mod tests {
}
}
+ #[test]
+ fn test_column_writer_check_byte_array_min_max() {
+ let page_writer = get_test_page_writer();
+ let props = Arc::new(WriterProperties::builder().build());
+ let mut writer =
+ get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
+ writer
+ .write_batch(
+ &[
+ ByteArray::from(vec![
+ 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
+ 172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
+ ]),
+ ByteArray::from(vec![
+ 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8,
+ 62u8, 146u8, 152u8, 177u8, 56u8, 0u8, 0u8,
+ ]),
+ ByteArray::from(vec![
+ 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
+ 0u8, 0u8, 0u8,
+ ]),
+ ByteArray::from(vec![
+ 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
+ 246u8, 44u8, 0u8, 0u8,
+ ]),
+ ],
+ None,
+ None,
+ )
+ .unwrap();
+ let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
+ if let Some(stats) = metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ if let Statistics::ByteArray(stats) = stats {
+ assert_eq!(
+ stats.min(),
+ &ByteArray::from(vec![
+ 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
+ 172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
+ ])
+ );
+ assert_eq!(
+ stats.max(),
+ &ByteArray::from(vec![
+ 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
+ 246u8, 44u8, 0u8, 0u8,
+ ])
+ );
+ } else {
+ panic!("expecting Statistics::ByteArray");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+ }
+
+ #[test]
+ fn test_column_writer_uint32_converted_type_min_max() {
+ let page_writer = get_test_page_writer();
+ let props = Arc::new(WriterProperties::builder().build());
+ let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<
+ Int32Type,
+ >(page_writer, 0, 0, props);
+ writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
+ let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
+ if let Some(stats) = metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ if let Statistics::Int32(stats) = stats {
+ assert_eq!(stats.min(), &0,);
+ assert_eq!(stats.max(), &5,);
+ } else {
+ panic!("expecting Statistics::Int32");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+ }
+
#[test]
fn test_column_writer_precalculated_statistics() {
let page_writer = get_test_page_writer();
@@ -1887,6 +2048,28 @@ mod tests {
assert!(matches!(stats, Statistics::Double(_)));
}
+ #[test]
+ fn test_compare_greater_byte_array_decimals() {
+ assert!(!compare_greater_byte_array_decimals(&[], &[],),);
+ assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
+ assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
+ assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
+ assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
+ assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
+ assert!(!compare_greater_byte_array_decimals(
+ &[0u8, 1u8,],
+ &[1u8, 0u8,],
+ ),);
+ assert!(!compare_greater_byte_array_decimals(
+ &[255u8, 35u8, 0u8, 0u8,],
+ &[0u8,],
+ ),);
+ assert!(compare_greater_byte_array_decimals(
+ &[0u8,],
+ &[255u8, 35u8, 0u8, 0u8,],
+ ),);
+ }
+
/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
@@ -2153,4 +2336,94 @@ mod tests {
panic!("metadata missing statistics");
}
}
+
+ /// Returns Decimals column writer.
+ fn get_test_decimals_column_writer<T: DataType>(
+ page_writer: Box<dyn PageWriter>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ props: WriterPropertiesPtr,
+ ) -> ColumnWriterImpl<T> {
+ let descr = Arc::new(get_test_decimals_column_descr::<T>(
+ max_def_level,
+ max_rep_level,
+ ));
+ let column_writer = get_column_writer(descr, props, page_writer);
+ get_typed_column_writer::<T>(column_writer)
+ }
+
+ /// Returns decimals column reader.
+ fn get_test_decimals_column_reader<T: DataType>(
+ page_reader: Box<dyn PageReader>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ ) -> ColumnReaderImpl<T> {
+ let descr = Arc::new(get_test_decimals_column_descr::<T>(
+ max_def_level,
+ max_rep_level,
+ ));
+ let column_reader = get_column_reader(descr, page_reader);
+ get_typed_column_reader::<T>(column_reader)
+ }
+
+ /// Returns descriptor for Decimal type with primitive column.
+ fn get_test_decimals_column_descr<T: DataType>(
+ max_def_level: i16,
+ max_rep_level: i16,
+ ) -> ColumnDescriptor {
+ let path = ColumnPath::from("col");
+ let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
+ .with_length(16)
+ .with_logical_type(Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 3,
+ }))
+ .with_scale(2)
+ .with_precision(3)
+ .build()
+ .unwrap();
+ ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
+ }
+
+ /// Returns column writer for UINT32 Column provided as ConvertedType only
+ fn get_test_unsigned_int_given_as_converted_column_writer<T: DataType>(
+ page_writer: Box<dyn PageWriter>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ props: WriterPropertiesPtr,
+ ) -> ColumnWriterImpl<T> {
+ let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
+ max_def_level,
+ max_rep_level,
+ ));
+ let column_writer = get_column_writer(descr, props, page_writer);
+ get_typed_column_writer::<T>(column_writer)
+ }
+
+ /// Returns column reader for UINT32 Column provided as ConvertedType only
+ fn get_test_unsigned_int_given_as_converted_column_reader<T: DataType>(
+ page_reader: Box<dyn PageReader>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ ) -> ColumnReaderImpl<T> {
+ let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
+ max_def_level,
+ max_rep_level,
+ ));
+ let column_reader = get_column_reader(descr, page_reader);
+ get_typed_column_reader::<T>(column_reader)
+ }
+
+ /// Returns column descriptor for UINT32 Column provided as ConvertedType only
+ fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
+ max_def_level: i16,
+ max_rep_level: i16,
+ ) -> ColumnDescriptor {
+ let path = ColumnPath::from("col");
+ let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
+ .with_converted_type(ConvertedType::UINT_32)
+ .build()
+ .unwrap();
+ ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
+ }
}