You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/29 17:49:48 UTC

[arrow-rs] branch master updated: Fix decimals min max statistics (#1621)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d6499d0 Fix decimals min max statistics (#1621)
d8d6499d0 is described below

commit d8d6499d081dbcf0dd626fa63aad24b4996d19d7
Author: Atef Sawaed <at...@gmail.com>
AuthorDate: Fri Apr 29 20:49:43 2022 +0300

    Fix decimals min max statistics (#1621)
    
    * Fix incorrect writing of min/max statistics
    
    * Refactor
    
    * Decimals Byte array comparison
    
    * Add Decimals test
    
    * Use slice instead of vector
    
    * Fix build error
    
    * Fix build error
    
    * Coding Style
    
    * More tests
    
    * Refactor
    
    * Improve code readability
    
    Co-authored-by: Atef Sawaed <at...@microsoft.com>
---
 parquet/src/column/writer.rs | 275 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 274 insertions(+), 1 deletion(-)

diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 016d72e92..13ea85157 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -18,7 +18,7 @@
 //! Contains column writer API.
 use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};
 
-use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
+use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
 use crate::compression::{create_codec, Codec};
 use crate::data_type::private::ParquetValueType;
@@ -1006,6 +1006,41 @@ impl<T: DataType> ColumnWriterImpl<T> {
                 return a.as_u64().unwrap() > b.as_u64().unwrap();
             }
         }
+
+        match self.descr.converted_type() {
+            ConvertedType::UINT_8
+            | ConvertedType::UINT_16
+            | ConvertedType::UINT_32
+            | ConvertedType::UINT_64 => {
+                return a.as_u64().unwrap() > b.as_u64().unwrap();
+            }
+            _ => {}
+        };
+
+        if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() {
+            match self.descr.physical_type() {
+                Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+                    return compare_greater_byte_array_decimals(
+                        a.as_bytes(),
+                        b.as_bytes(),
+                    );
+                }
+                _ => {}
+            };
+        }
+
+        if self.descr.converted_type() == ConvertedType::DECIMAL {
+            match self.descr.physical_type() {
+                Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+                    return compare_greater_byte_array_decimals(
+                        a.as_bytes(),
+                        b.as_bytes(),
+                    );
+                }
+                _ => {}
+            };
+        };
+
         a > b
     }
 }
@@ -1049,6 +1084,54 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
     }
 }
 
+/// Signed comparison of bytes arrays
+fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
+    let a_length = a.len();
+    let b_length = b.len();
+
+    if a_length == 0 || b_length == 0 {
+        return a_length > 0;
+    }
+
+    let first_a: u8 = a[0];
+    let first_b: u8 = b[0];
+
+    // We can short circuit for different signed numbers or
+    // for equal length bytes arrays that have different first bytes.
+    // The equality requirement is necessary for sign extension cases.
+    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
+    if (0x80 & first_a) != (0x80 & first_b)
+        || (a_length == b_length && first_a != first_b)
+    {
+        return (first_a as i8) > (first_b as i8);
+    }
+
+    // When the lengths are unequal and the numbers are of the same
+    // sign we need to do comparison by sign extending the shorter
+    // value first, and once we get to equal sized arrays, lexicographical
+    // unsigned comparison of everything but the first byte is sufficient.
+
+    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
+
+    if a_length != b_length {
+        let not_equal = if a_length > b_length {
+            let lead_length = a_length - b_length;
+            (&a[0..lead_length]).iter().any(|&x| x != extension)
+        } else {
+            let lead_length = b_length - a_length;
+            (&b[0..lead_length]).iter().any(|&x| x != extension)
+        };
+
+        if not_equal {
+            let negative_values: bool = (first_a as i8) < 0;
+            let a_longer: bool = a_length > b_length;
+            return if negative_values { !a_longer } else { a_longer };
+        }
+    }
+
+    (a[1..]) > (b[1..])
+}
+
 #[cfg(test)]
 mod tests {
     use rand::distributions::uniform::SampleUniform;
@@ -1475,6 +1558,84 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_column_writer_check_byte_array_min_max() {
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer =
+            get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
+        writer
+            .write_batch(
+                &[
+                    ByteArray::from(vec![
+                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
+                        172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
+                    ]),
+                    ByteArray::from(vec![
+                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8,
+                        62u8, 146u8, 152u8, 177u8, 56u8, 0u8, 0u8,
+                    ]),
+                    ByteArray::from(vec![
+                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
+                        0u8, 0u8, 0u8,
+                    ]),
+                    ByteArray::from(vec![
+                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
+                        246u8, 44u8, 0u8, 0u8,
+                    ]),
+                ],
+                None,
+                None,
+            )
+            .unwrap();
+        let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            if let Statistics::ByteArray(stats) = stats {
+                assert_eq!(
+                    stats.min(),
+                    &ByteArray::from(vec![
+                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
+                        172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
+                    ])
+                );
+                assert_eq!(
+                    stats.max(),
+                    &ByteArray::from(vec![
+                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
+                        246u8, 44u8, 0u8, 0u8,
+                    ])
+                );
+            } else {
+                panic!("expecting Statistics::ByteArray");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
+    #[test]
+    fn test_column_writer_uint32_converted_type_min_max() {
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<
+            Int32Type,
+        >(page_writer, 0, 0, props);
+        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
+        let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            if let Statistics::Int32(stats) = stats {
+                assert_eq!(stats.min(), &0,);
+                assert_eq!(stats.max(), &5,);
+            } else {
+                panic!("expecting Statistics::Int32");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
     #[test]
     fn test_column_writer_precalculated_statistics() {
         let page_writer = get_test_page_writer();
@@ -1887,6 +2048,28 @@ mod tests {
         assert!(matches!(stats, Statistics::Double(_)));
     }
 
+    #[test]
+    fn test_compare_greater_byte_array_decimals() {
+        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
+        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
+        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
+        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
+        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
+        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
+        assert!(!compare_greater_byte_array_decimals(
+            &[0u8, 1u8,],
+            &[1u8, 0u8,],
+        ),);
+        assert!(!compare_greater_byte_array_decimals(
+            &[255u8, 35u8, 0u8, 0u8,],
+            &[0u8,],
+        ),);
+        assert!(compare_greater_byte_array_decimals(
+            &[0u8,],
+            &[255u8, 35u8, 0u8, 0u8,],
+        ),);
+    }
+
     /// Performs write-read roundtrip with randomly generated values and levels.
     /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
     /// for a column.
@@ -2153,4 +2336,94 @@ mod tests {
             panic!("metadata missing statistics");
         }
     }
+
+    /// Returns Decimals column writer.
+    fn get_test_decimals_column_writer<T: DataType>(
+        page_writer: Box<dyn PageWriter>,
+        max_def_level: i16,
+        max_rep_level: i16,
+        props: WriterPropertiesPtr,
+    ) -> ColumnWriterImpl<T> {
+        let descr = Arc::new(get_test_decimals_column_descr::<T>(
+            max_def_level,
+            max_rep_level,
+        ));
+        let column_writer = get_column_writer(descr, props, page_writer);
+        get_typed_column_writer::<T>(column_writer)
+    }
+
+    /// Returns decimals column reader.
+    fn get_test_decimals_column_reader<T: DataType>(
+        page_reader: Box<dyn PageReader>,
+        max_def_level: i16,
+        max_rep_level: i16,
+    ) -> ColumnReaderImpl<T> {
+        let descr = Arc::new(get_test_decimals_column_descr::<T>(
+            max_def_level,
+            max_rep_level,
+        ));
+        let column_reader = get_column_reader(descr, page_reader);
+        get_typed_column_reader::<T>(column_reader)
+    }
+
+    /// Returns descriptor for Decimal type with primitive column.
+    fn get_test_decimals_column_descr<T: DataType>(
+        max_def_level: i16,
+        max_rep_level: i16,
+    ) -> ColumnDescriptor {
+        let path = ColumnPath::from("col");
+        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
+            .with_length(16)
+            .with_logical_type(Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 3,
+            }))
+            .with_scale(2)
+            .with_precision(3)
+            .build()
+            .unwrap();
+        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
+    }
+
+    /// Returns column writer for UINT32 Column provided as ConvertedType only
+    fn get_test_unsigned_int_given_as_converted_column_writer<T: DataType>(
+        page_writer: Box<dyn PageWriter>,
+        max_def_level: i16,
+        max_rep_level: i16,
+        props: WriterPropertiesPtr,
+    ) -> ColumnWriterImpl<T> {
+        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
+            max_def_level,
+            max_rep_level,
+        ));
+        let column_writer = get_column_writer(descr, props, page_writer);
+        get_typed_column_writer::<T>(column_writer)
+    }
+
+    ///  Returns column reader for UINT32 Column provided as ConvertedType only
+    fn get_test_unsigned_int_given_as_converted_column_reader<T: DataType>(
+        page_reader: Box<dyn PageReader>,
+        max_def_level: i16,
+        max_rep_level: i16,
+    ) -> ColumnReaderImpl<T> {
+        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
+            max_def_level,
+            max_rep_level,
+        ));
+        let column_reader = get_column_reader(descr, page_reader);
+        get_typed_column_reader::<T>(column_reader)
+    }
+
+    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
+    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
+        max_def_level: i16,
+        max_rep_level: i16,
+    ) -> ColumnDescriptor {
+        let path = ColumnPath::from("col");
+        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
+            .with_converted_type(ConvertedType::UINT_32)
+            .build()
+            .unwrap();
+        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
+    }
 }