You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/11 18:11:31 UTC

[arrow-rs] branch master updated: Truncate Min/Max values in the Column Index (#4389)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2462d3604 Truncate Min/Max values in the Column Index (#4389)
2462d3604 is described below

commit 2462d3604593c3df5f9c815dbc2486ebaaf3a596
Author: Adam Gutglick <ad...@gmail.com>
AuthorDate: Sun Jun 11 21:11:25 2023 +0300

    Truncate Min/Max values in the Column Index (#4389)
    
    * Initial work
    
    * Slight rename
    
    * Update parquet/src/column/writer/mod.rs
    
    Co-authored-by: Will Jones <wi...@gmail.com>
    
    * Handle utf8 vs binary truncation, include increases
    
    * Small cleanup
    
    * Update parquet/src/file/properties.rs
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * Update parquet/src/file/properties.rs
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * Update parquet/src/column/writer/mod.rs
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * Review notes
    
    * Added handeling for some more cases - including not truncating non-BinaryArray data
    
    * Update parquet/src/column/writer/mod.rs
    
    Co-authored-by: Will Jones <wi...@gmail.com>
    
    * Handels increment better and some refactoring
    
    * Nicer handeling of physical type
    
    * More review notes
    
    ---------
    
    Co-authored-by: Will Jones <wi...@gmail.com>
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 parquet/src/column/writer/mod.rs | 347 +++++++++++++++++++++++++++++++++++++--
 parquet/src/file/metadata.rs     |   8 +-
 parquet/src/file/properties.rs   |  24 +++
 3 files changed, 359 insertions(+), 20 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 93dff1b46..7a84680fa 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -20,6 +20,7 @@
 use crate::bloom_filter::Sbbf;
 use crate::format::{ColumnIndex, OffsetIndex};
 use std::collections::{BTreeSet, VecDeque};
+use std::str;
 
 use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
@@ -656,8 +657,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         if null_page && self.column_index_builder.valid() {
             self.column_index_builder.append(
                 null_page,
-                &[0; 1],
-                &[0; 1],
+                vec![0; 1],
+                vec![0; 1],
                 self.page_metrics.num_page_nulls as i64,
             );
         } else if self.column_index_builder.valid() {
@@ -668,19 +669,54 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                     self.column_index_builder.to_invalid();
                 }
                 Some(stat) => {
-                    self.column_index_builder.append(
-                        null_page,
-                        stat.min_bytes(),
-                        stat.max_bytes(),
-                        self.page_metrics.num_page_nulls as i64,
-                    );
+                    // We only truncate if the data is represented as binary
+                    match self.descr.physical_type() {
+                        Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
+                            self.column_index_builder.append(
+                                null_page,
+                                self.truncate_min_value(stat.min_bytes()),
+                                self.truncate_max_value(stat.max_bytes()),
+                                self.page_metrics.num_page_nulls as i64,
+                            );
+                        }
+                        _ => {
+                            self.column_index_builder.append(
+                                null_page,
+                                stat.min_bytes().to_vec(),
+                                stat.max_bytes().to_vec(),
+                                self.page_metrics.num_page_nulls as i64,
+                            );
+                        }
+                    }
                 }
             }
+
+            // update the offset index
+            self.offset_index_builder
+                .append_row_count(self.page_metrics.num_buffered_rows as i64);
         }
+    }
 
-        // update the offset index
-        self.offset_index_builder
-            .append_row_count(self.page_metrics.num_buffered_rows as i64);
+    fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> {
+        self.props
+            .column_index_truncate_length()
+            .filter(|l| data.len() > *l)
+            .and_then(|l| match str::from_utf8(data) {
+                Ok(str_data) => truncate_utf8(str_data, l),
+                Err(_) => truncate_binary(data, l),
+            })
+            .unwrap_or_else(|| data.to_vec())
+    }
+
+    fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> {
+        self.props
+            .column_index_truncate_length()
+            .filter(|l| data.len() > *l)
+            .and_then(|l| match str::from_utf8(data) {
+                Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8),
+                Err(_) => truncate_binary(data, l).and_then(increment),
+            })
+            .unwrap_or_else(|| data.to_vec())
     }
 
     /// Adds data page.
@@ -1152,9 +1188,76 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
     (a[1..]) > (b[1..])
 }
 
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 string, while being less than `length` bytes.
+fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
+    // We return values like that at an earlier stage in the process.
+    assert!(data.len() >= length);
+    let mut char_indices = data.char_indices();
+
+    // We know `data` is a valid UTF8 encoded string, which means it has at least one valid UTF8 byte, which will make this loop exist.
+    while let Some((idx, c)) = char_indices.next_back() {
+        let split_point = idx + c.len_utf8();
+        if split_point <= length {
+            return data.as_bytes()[0..split_point].to_vec().into();
+        }
+    }
+
+    None
+}
+
+/// Truncate a binary slice to make sure its length is less than `length`
+fn truncate_binary(data: &[u8], length: usize) -> Option<Vec<u8>> {
+    // We return values like that at an earlier stage in the process.
+    assert!(data.len() >= length);
+    // If all bytes are already maximal, no need to truncate
+
+    data[0..length].to_vec().into()
+}
+
+/// Try and increment the bytes from right to left.
+///
+/// Returns `None` if all bytes are set to `u8::MAX`.
+fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
+    for byte in data.iter_mut().rev() {
+        let (incremented, overflow) = byte.overflowing_add(1);
+        *byte = incremented;
+
+        if !overflow {
+            return Some(data);
+        }
+    }
+
+    None
+}
+
+/// Try and increment the the string's bytes from right to left, returning when the result is a valid UTF8 string.
+/// Returns `None` when it can't increment any byte.
+fn increment_utf8(mut data: Vec<u8>) -> Option<Vec<u8>> {
+    for idx in (0..data.len()).rev() {
+        let original = data[idx];
+        let (mut byte, mut overflow) = data[idx].overflowing_add(1);
+
+        // Until overflow: 0xFF -> 0x00
+        while !overflow {
+            data[idx] = byte;
+
+            if str::from_utf8(&data).is_ok() {
+                return Some(data);
+            }
+            (byte, overflow) = data[idx].overflowing_add(1);
+        }
+
+        data[idx] = original;
+    }
+
+    None
+}
+
 #[cfg(test)]
 mod tests {
-    use crate::format::BoundaryOrder;
+    use crate::{
+        file::properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, format::BoundaryOrder,
+    };
     use bytes::Bytes;
     use rand::distributions::uniform::SampleUniform;
     use std::sync::Arc;
@@ -2197,11 +2300,9 @@ mod tests {
             if let Statistics::Int32(stats) = stats {
                 // first page is [1,2,3,4]
                 // second page is [-5,2,4,8]
+                // note that we don't increment here, as this is a non BinaryArray type.
                 assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
-                assert_eq!(
-                    stats.max_bytes(),
-                    column_index.max_values.get(1).unwrap().as_slice()
-                );
+                assert_eq!(stats.max_bytes(), column_index.max_values.get(1).unwrap());
             } else {
                 panic!("expecting Statistics::Int32");
             }
@@ -2220,12 +2321,226 @@ mod tests {
         );
     }
 
+    /// Verify min/max value truncation in the column index works as expected
+    #[test]
+    fn test_column_offset_index_metadata_truncating() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Default::default();
+        let mut writer =
+            get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
+
+        let mut data = vec![FixedLenByteArray::default(); 3];
+        // This is the expected min value - "aaa..."
+        data[0].set_data(ByteBufferPtr::new(vec![97_u8; 200]));
+        // This is the expected max value - "ZZZ..."
+        data[1].set_data(ByteBufferPtr::new(vec![112_u8; 200]));
+        data[2].set_data(ByteBufferPtr::new(vec![98_u8; 200]));
+
+        writer.write_batch(&data, None, None).unwrap();
+
+        writer.flush_data_pages().unwrap();
+
+        let r = writer.close().unwrap();
+        let column_index = r.column_index.unwrap();
+        let offset_index = r.offset_index.unwrap();
+
+        assert_eq!(3, r.rows_written);
+
+        // column index
+        assert_eq!(1, column_index.null_pages.len());
+        assert_eq!(1, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+        assert!(!column_index.null_pages[0]);
+        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+        if let Some(stats) = r.metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::FixedLenByteArray(stats) = stats {
+                let column_index_min_value = column_index.min_values.get(0).unwrap();
+                let column_index_max_value = column_index.max_values.get(0).unwrap();
+
+                // Column index stats are truncated, while the column chunk's aren't.
+                assert_ne!(stats.min_bytes(), column_index_min_value.as_slice());
+                assert_ne!(stats.max_bytes(), column_index_max_value.as_slice());
+
+                assert_eq!(
+                    column_index_min_value.len(),
+                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+                );
+                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
+                assert_eq!(
+                    column_index_max_value.len(),
+                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+                );
+
+                // We expect the last byte to be incremented
+                assert_eq!(
+                    *column_index_max_value.last().unwrap(),
+                    *column_index_max_value.first().unwrap() + 1
+                );
+            } else {
+                panic!("expecting Statistics::FixedLenByteArray");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
+    #[test]
+    fn test_column_offset_index_truncating_spec_example() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+
+        // Truncate values at 1 byte
+        let builder =
+            WriterProperties::builder().set_column_index_truncate_length(Some(1));
+        let props = Arc::new(builder.build());
+        let mut writer =
+            get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
+
+        let mut data = vec![FixedLenByteArray::default(); 1];
+        // This is the expected min value
+        data[0].set_data(ByteBufferPtr::new(
+            String::from("Blart Versenwald III").into_bytes(),
+        ));
+
+        writer.write_batch(&data, None, None).unwrap();
+
+        writer.flush_data_pages().unwrap();
+
+        let r = writer.close().unwrap();
+        let column_index = r.column_index.unwrap();
+        let offset_index = r.offset_index.unwrap();
+
+        assert_eq!(1, r.rows_written);
+
+        // column index
+        assert_eq!(1, column_index.null_pages.len());
+        assert_eq!(1, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+        assert!(!column_index.null_pages[0]);
+        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+        if let Some(stats) = r.metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::FixedLenByteArray(_stats) = stats {
+                let column_index_min_value = column_index.min_values.get(0).unwrap();
+                let column_index_max_value = column_index.max_values.get(0).unwrap();
+
+                assert_eq!(column_index_min_value.len(), 1);
+                assert_eq!(column_index_max_value.len(), 1);
+
+                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
+                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
+
+                assert_ne!(column_index_min_value, stats.min_bytes());
+                assert_ne!(column_index_max_value, stats.max_bytes());
+            } else {
+                panic!("expecting Statistics::FixedLenByteArray");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
     #[test]
     fn test_send() {
         fn test<T: Send>() {}
         test::<ColumnWriterImpl<Int32Type>>();
     }
 
+    #[test]
+    fn test_increment() {
+        let v = increment(vec![0, 0, 0]).unwrap();
+        assert_eq!(&v, &[0, 0, 1]);
+
+        // Handle overflow
+        let v = increment(vec![0, 255, 255]).unwrap();
+        assert_eq!(&v, &[1, 0, 0]);
+
+        // Return `None` if all bytes are u8::MAX
+        let v = increment(vec![255, 255, 255]);
+        assert!(v.is_none());
+    }
+
+    #[test]
+    fn test_increment_utf8() {
+        // Basic ASCII case
+        let v = increment_utf8("hello".as_bytes().to_vec()).unwrap();
+        assert_eq!(&v, "hellp".as_bytes());
+
+        // Also show that BinaryArray level comparison works here
+        let mut greater = ByteArray::new();
+        greater.set_data(ByteBufferPtr::new(v));
+        let mut original = ByteArray::new();
+        original.set_data(ByteBufferPtr::new("hello".as_bytes().to_vec()));
+        assert!(greater > original);
+
+        // UTF8 string
+        let s = "โค๏ธ๐Ÿงก๐Ÿ’›๐Ÿ’š๐Ÿ’™๐Ÿ’œ";
+        let v = increment_utf8(s.as_bytes().to_vec()).unwrap();
+
+        if let Ok(new) = String::from_utf8(v) {
+            assert_ne!(&new, s);
+            assert_eq!(new, "โค๏ธ๐Ÿงก๐Ÿ’›๐Ÿ’š๐Ÿ’™๐Ÿ’");
+            assert!(new.as_bytes().last().unwrap() > s.as_bytes().last().unwrap());
+        } else {
+            panic!("Expected incremented UTF8 string to also be valid.")
+        }
+
+        // Max UTF8 character - should be a No-Op
+        let s = char::MAX.to_string();
+        assert_eq!(s.len(), 4);
+        let v = increment_utf8(s.as_bytes().to_vec());
+        assert!(v.is_none());
+
+        // Handle multi-byte UTF8 characters
+        let s = "a\u{10ffff}";
+        let v = increment_utf8(s.as_bytes().to_vec());
+        assert_eq!(&v.unwrap(), "b\u{10ffff}".as_bytes());
+    }
+
+    #[test]
+    fn test_truncate_utf8() {
+        // No-op
+        let data = "โค๏ธ๐Ÿงก๐Ÿ’›๐Ÿ’š๐Ÿ’™๐Ÿ’œ";
+        let r = truncate_utf8(data, data.as_bytes().len()).unwrap();
+        assert_eq!(r.len(), data.as_bytes().len());
+        assert_eq!(&r, data.as_bytes());
+        println!("len is {}", data.len());
+
+        // We slice it away from the UTF8 boundary
+        let r = truncate_utf8(data, 13).unwrap();
+        assert_eq!(r.len(), 10);
+        assert_eq!(&r, "โค๏ธ๐Ÿงก".as_bytes());
+
+        // One multi-byte code point, and a length shorter than it, so we can't slice it
+        let r = truncate_utf8("\u{0836}", 1);
+        assert!(r.is_none());
+    }
+
+    #[test]
+    fn test_truncate_max_binary_chars() {
+        let r =
+            truncate_binary(&[0xFF, 0xFE, 0xFD, 0xFF, 0xFF, 0xFF], 5).and_then(increment);
+
+        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
+
+        // We can truncate this slice, but increment it will fail
+        let truncated = truncate_binary(&[0xFF, 0xFF, 0xFF, 0xFF], 3);
+        assert!(truncated.is_some());
+
+        let incremented = truncated.and_then(increment);
+        assert!(incremented.is_none())
+    }
+
     /// Performs write-read roundtrip with randomly generated values and levels.
     /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
     /// for a column.
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 40f6cf312..bb8346306 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -868,13 +868,13 @@ impl ColumnIndexBuilder {
     pub fn append(
         &mut self,
         null_page: bool,
-        min_value: &[u8],
-        max_value: &[u8],
+        min_value: Vec<u8>,
+        max_value: Vec<u8>,
         null_count: i64,
     ) {
         self.null_pages.push(null_page);
-        self.min_values.push(min_value.to_vec());
-        self.max_values.push(max_value.to_vec());
+        self.min_values.push(min_value);
+        self.max_values.push(max_value);
         self.null_counts.push(null_count);
     }
 
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 9724fd7f4..3d6390c03 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -45,6 +45,8 @@ pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
 /// Default value for [`WriterProperties::created_by`]
 pub const DEFAULT_CREATED_BY: &str =
     concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
+/// Default value for [`WriterProperties::column_index_truncate_length`]
+pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
 /// Default value for [`BloomFilterProperties::fpp`]
 pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
 /// Default value for [`BloomFilterProperties::ndv`]
@@ -121,6 +123,7 @@ pub struct WriterProperties {
     default_column_properties: ColumnProperties,
     column_properties: HashMap<ColumnPath, ColumnProperties>,
     sorting_columns: Option<Vec<SortingColumn>>,
+    column_index_truncate_length: Option<usize>,
 }
 
 impl Default for WriterProperties {
@@ -219,6 +222,13 @@ impl WriterProperties {
         self.sorting_columns.as_ref()
     }
 
+    /// Returns the maximum length of truncated min/max values in the column index.
+    ///
+    /// `None` if truncation is disabled, must be greater than 0 otherwise.
+    pub fn column_index_truncate_length(&self) -> Option<usize> {
+        self.column_index_truncate_length
+    }
+
     /// Returns encoding for a data page, when dictionary encoding is enabled.
     /// This is not configurable.
     #[inline]
@@ -314,6 +324,7 @@ pub struct WriterPropertiesBuilder {
     default_column_properties: ColumnProperties,
     column_properties: HashMap<ColumnPath, ColumnProperties>,
     sorting_columns: Option<Vec<SortingColumn>>,
+    column_index_truncate_length: Option<usize>,
 }
 
 impl WriterPropertiesBuilder {
@@ -331,6 +342,7 @@ impl WriterPropertiesBuilder {
             default_column_properties: Default::default(),
             column_properties: HashMap::new(),
             sorting_columns: None,
+            column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
         }
     }
 
@@ -348,6 +360,7 @@ impl WriterPropertiesBuilder {
             default_column_properties: self.default_column_properties,
             column_properties: self.column_properties,
             sorting_columns: self.sorting_columns,
+            column_index_truncate_length: self.column_index_truncate_length,
         }
     }
 
@@ -620,6 +633,17 @@ impl WriterPropertiesBuilder {
         self.get_mut_props(col).set_bloom_filter_ndv(value);
         self
     }
+
+    /// Sets the max length of min/max value fields in the column index. Must be greater than 0.
+    /// If set to `None` - there's no effective limit.
+    pub fn set_column_index_truncate_length(mut self, max_length: Option<usize>) -> Self {
+        if let Some(value) = max_length {
+            assert!(value > 0, "Cannot have a 0 column index truncate length. If you wish to disable min/max value truncation, set it to `None`.");
+        }
+
+        self.column_index_truncate_length = max_length;
+        self
+    }
 }
 
 /// Controls the level of statistics to be computed by the writer