You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/11 18:11:31 UTC
[arrow-rs] branch master updated: Truncate Min/Max values in the Column Index (#4389)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2462d3604 Truncate Min/Max values in the Column Index (#4389)
2462d3604 is described below
commit 2462d3604593c3df5f9c815dbc2486ebaaf3a596
Author: Adam Gutglick <ad...@gmail.com>
AuthorDate: Sun Jun 11 21:11:25 2023 +0300
Truncate Min/Max values in the Column Index (#4389)
* Initial work
* Slight rename
* Update parquet/src/column/writer/mod.rs
Co-authored-by: Will Jones <wi...@gmail.com>
* Handle utf8 vs binary truncation, include increases
* Small cleanup
* Update parquet/src/file/properties.rs
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* Update parquet/src/file/properties.rs
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* Update parquet/src/column/writer/mod.rs
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* Review notes
* Added handeling for some more cases - including not truncating non-BinaryArray data
* Update parquet/src/column/writer/mod.rs
Co-authored-by: Will Jones <wi...@gmail.com>
* Handels increment better and some refactoring
* Nicer handeling of physical type
* More review notes
---------
Co-authored-by: Will Jones <wi...@gmail.com>
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
parquet/src/column/writer/mod.rs | 347 +++++++++++++++++++++++++++++++++++++--
parquet/src/file/metadata.rs | 8 +-
parquet/src/file/properties.rs | 24 +++
3 files changed, 359 insertions(+), 20 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 93dff1b46..7a84680fa 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -20,6 +20,7 @@
use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
+use std::str;
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
@@ -656,8 +657,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
- &[0; 1],
- &[0; 1],
+ vec![0; 1],
+ vec![0; 1],
self.page_metrics.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
@@ -668,19 +669,54 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_index_builder.to_invalid();
}
Some(stat) => {
- self.column_index_builder.append(
- null_page,
- stat.min_bytes(),
- stat.max_bytes(),
- self.page_metrics.num_page_nulls as i64,
- );
+ // We only truncate if the data is represented as binary
+ match self.descr.physical_type() {
+ Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
+ self.column_index_builder.append(
+ null_page,
+ self.truncate_min_value(stat.min_bytes()),
+ self.truncate_max_value(stat.max_bytes()),
+ self.page_metrics.num_page_nulls as i64,
+ );
+ }
+ _ => {
+ self.column_index_builder.append(
+ null_page,
+ stat.min_bytes().to_vec(),
+ stat.max_bytes().to_vec(),
+ self.page_metrics.num_page_nulls as i64,
+ );
+ }
+ }
}
}
+
+ // update the offset index
+ self.offset_index_builder
+ .append_row_count(self.page_metrics.num_buffered_rows as i64);
}
+ }
- // update the offset index
- self.offset_index_builder
- .append_row_count(self.page_metrics.num_buffered_rows as i64);
+ fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> {
+ self.props
+ .column_index_truncate_length()
+ .filter(|l| data.len() > *l)
+ .and_then(|l| match str::from_utf8(data) {
+ Ok(str_data) => truncate_utf8(str_data, l),
+ Err(_) => truncate_binary(data, l),
+ })
+ .unwrap_or_else(|| data.to_vec())
+ }
+
+ fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> {
+ self.props
+ .column_index_truncate_length()
+ .filter(|l| data.len() > *l)
+ .and_then(|l| match str::from_utf8(data) {
+ Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8),
+ Err(_) => truncate_binary(data, l).and_then(increment),
+ })
+ .unwrap_or_else(|| data.to_vec())
}
/// Adds data page.
@@ -1152,9 +1188,76 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
(a[1..]) > (b[1..])
}
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 string, while being less than `length` bytes.
+fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
+ // We return values like that at an earlier stage in the process.
+ assert!(data.len() >= length);
+ let mut char_indices = data.char_indices();
+
+ // We know `data` is a valid UTF8 encoded string, which means it has at least one valid UTF8 byte, which will make this loop exist.
+ while let Some((idx, c)) = char_indices.next_back() {
+ let split_point = idx + c.len_utf8();
+ if split_point <= length {
+ return data.as_bytes()[0..split_point].to_vec().into();
+ }
+ }
+
+ None
+}
+
+/// Truncate a binary slice to make sure its length is less than `length`
+fn truncate_binary(data: &[u8], length: usize) -> Option<Vec<u8>> {
+ // We return values like that at an earlier stage in the process.
+ assert!(data.len() >= length);
+ // If all bytes are already maximal, no need to truncate
+
+ data[0..length].to_vec().into()
+}
+
+/// Try and increment the bytes from right to left.
+///
+/// Returns `None` if all bytes are set to `u8::MAX`.
+fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
+ for byte in data.iter_mut().rev() {
+ let (incremented, overflow) = byte.overflowing_add(1);
+ *byte = incremented;
+
+ if !overflow {
+ return Some(data);
+ }
+ }
+
+ None
+}
+
+/// Try and increment the the string's bytes from right to left, returning when the result is a valid UTF8 string.
+/// Returns `None` when it can't increment any byte.
+fn increment_utf8(mut data: Vec<u8>) -> Option<Vec<u8>> {
+ for idx in (0..data.len()).rev() {
+ let original = data[idx];
+ let (mut byte, mut overflow) = data[idx].overflowing_add(1);
+
+ // Until overflow: 0xFF -> 0x00
+ while !overflow {
+ data[idx] = byte;
+
+ if str::from_utf8(&data).is_ok() {
+ return Some(data);
+ }
+ (byte, overflow) = data[idx].overflowing_add(1);
+ }
+
+ data[idx] = original;
+ }
+
+ None
+}
+
#[cfg(test)]
mod tests {
- use crate::format::BoundaryOrder;
+ use crate::{
+ file::properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, format::BoundaryOrder,
+ };
use bytes::Bytes;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;
@@ -2197,11 +2300,9 @@ mod tests {
if let Statistics::Int32(stats) = stats {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
+ // note that we don't increment here, as this is a non BinaryArray type.
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
- assert_eq!(
- stats.max_bytes(),
- column_index.max_values.get(1).unwrap().as_slice()
- );
+ assert_eq!(stats.max_bytes(), column_index.max_values.get(1).unwrap());
} else {
panic!("expecting Statistics::Int32");
}
@@ -2220,12 +2321,226 @@ mod tests {
);
}
+ /// Verify min/max value truncation in the column index works as expected
+ #[test]
+ fn test_column_offset_index_metadata_truncating() {
+ // write data
+ // and check the offset index and column index
+ let page_writer = get_test_page_writer();
+ let props = Default::default();
+ let mut writer =
+ get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
+
+ let mut data = vec![FixedLenByteArray::default(); 3];
+ // This is the expected min value - "aaa..."
+ data[0].set_data(ByteBufferPtr::new(vec![97_u8; 200]));
+ // This is the expected max value - "ZZZ..."
+ data[1].set_data(ByteBufferPtr::new(vec![112_u8; 200]));
+ data[2].set_data(ByteBufferPtr::new(vec![98_u8; 200]));
+
+ writer.write_batch(&data, None, None).unwrap();
+
+ writer.flush_data_pages().unwrap();
+
+ let r = writer.close().unwrap();
+ let column_index = r.column_index.unwrap();
+ let offset_index = r.offset_index.unwrap();
+
+ assert_eq!(3, r.rows_written);
+
+ // column index
+ assert_eq!(1, column_index.null_pages.len());
+ assert_eq!(1, offset_index.page_locations.len());
+ assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+ assert!(!column_index.null_pages[0]);
+ assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+ if let Some(stats) = r.metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 0);
+ assert_eq!(stats.distinct_count(), None);
+ if let Statistics::FixedLenByteArray(stats) = stats {
+ let column_index_min_value = column_index.min_values.get(0).unwrap();
+ let column_index_max_value = column_index.max_values.get(0).unwrap();
+
+ // Column index stats are truncated, while the column chunk's aren't.
+ assert_ne!(stats.min_bytes(), column_index_min_value.as_slice());
+ assert_ne!(stats.max_bytes(), column_index_max_value.as_slice());
+
+ assert_eq!(
+ column_index_min_value.len(),
+ DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+ );
+ assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
+ assert_eq!(
+ column_index_max_value.len(),
+ DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+ );
+
+ // We expect the last byte to be incremented
+ assert_eq!(
+ *column_index_max_value.last().unwrap(),
+ *column_index_max_value.first().unwrap() + 1
+ );
+ } else {
+ panic!("expecting Statistics::FixedLenByteArray");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+ }
+
+ #[test]
+ fn test_column_offset_index_truncating_spec_example() {
+ // write data
+ // and check the offset index and column index
+ let page_writer = get_test_page_writer();
+
+ // Truncate values at 1 byte
+ let builder =
+ WriterProperties::builder().set_column_index_truncate_length(Some(1));
+ let props = Arc::new(builder.build());
+ let mut writer =
+ get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
+
+ let mut data = vec![FixedLenByteArray::default(); 1];
+ // This is the expected min value
+ data[0].set_data(ByteBufferPtr::new(
+ String::from("Blart Versenwald III").into_bytes(),
+ ));
+
+ writer.write_batch(&data, None, None).unwrap();
+
+ writer.flush_data_pages().unwrap();
+
+ let r = writer.close().unwrap();
+ let column_index = r.column_index.unwrap();
+ let offset_index = r.offset_index.unwrap();
+
+ assert_eq!(1, r.rows_written);
+
+ // column index
+ assert_eq!(1, column_index.null_pages.len());
+ assert_eq!(1, offset_index.page_locations.len());
+ assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+ assert!(!column_index.null_pages[0]);
+ assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+ if let Some(stats) = r.metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 0);
+ assert_eq!(stats.distinct_count(), None);
+ if let Statistics::FixedLenByteArray(_stats) = stats {
+ let column_index_min_value = column_index.min_values.get(0).unwrap();
+ let column_index_max_value = column_index.max_values.get(0).unwrap();
+
+ assert_eq!(column_index_min_value.len(), 1);
+ assert_eq!(column_index_max_value.len(), 1);
+
+ assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
+ assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
+
+ assert_ne!(column_index_min_value, stats.min_bytes());
+ assert_ne!(column_index_max_value, stats.max_bytes());
+ } else {
+ panic!("expecting Statistics::FixedLenByteArray");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+ }
+
#[test]
fn test_send() {
fn test<T: Send>() {}
test::<ColumnWriterImpl<Int32Type>>();
}
+ #[test]
+ fn test_increment() {
+ let v = increment(vec![0, 0, 0]).unwrap();
+ assert_eq!(&v, &[0, 0, 1]);
+
+ // Handle overflow
+ let v = increment(vec![0, 255, 255]).unwrap();
+ assert_eq!(&v, &[1, 0, 0]);
+
+ // Return `None` if all bytes are u8::MAX
+ let v = increment(vec![255, 255, 255]);
+ assert!(v.is_none());
+ }
+
+ #[test]
+ fn test_increment_utf8() {
+ // Basic ASCII case
+ let v = increment_utf8("hello".as_bytes().to_vec()).unwrap();
+ assert_eq!(&v, "hellp".as_bytes());
+
+ // Also show that BinaryArray level comparison works here
+ let mut greater = ByteArray::new();
+ greater.set_data(ByteBufferPtr::new(v));
+ let mut original = ByteArray::new();
+ original.set_data(ByteBufferPtr::new("hello".as_bytes().to_vec()));
+ assert!(greater > original);
+
+ // UTF8 string
+ let s = "โค๏ธ๐งก๐๐๐๐";
+ let v = increment_utf8(s.as_bytes().to_vec()).unwrap();
+
+ if let Ok(new) = String::from_utf8(v) {
+ assert_ne!(&new, s);
+ assert_eq!(new, "โค๏ธ๐งก๐๐๐๐");
+ assert!(new.as_bytes().last().unwrap() > s.as_bytes().last().unwrap());
+ } else {
+ panic!("Expected incremented UTF8 string to also be valid.")
+ }
+
+ // Max UTF8 character - should be a No-Op
+ let s = char::MAX.to_string();
+ assert_eq!(s.len(), 4);
+ let v = increment_utf8(s.as_bytes().to_vec());
+ assert!(v.is_none());
+
+ // Handle multi-byte UTF8 characters
+ let s = "a\u{10ffff}";
+ let v = increment_utf8(s.as_bytes().to_vec());
+ assert_eq!(&v.unwrap(), "b\u{10ffff}".as_bytes());
+ }
+
+ #[test]
+ fn test_truncate_utf8() {
+ // No-op
+ let data = "โค๏ธ๐งก๐๐๐๐";
+ let r = truncate_utf8(data, data.as_bytes().len()).unwrap();
+ assert_eq!(r.len(), data.as_bytes().len());
+ assert_eq!(&r, data.as_bytes());
+ println!("len is {}", data.len());
+
+ // We slice it away from the UTF8 boundary
+ let r = truncate_utf8(data, 13).unwrap();
+ assert_eq!(r.len(), 10);
+ assert_eq!(&r, "โค๏ธ๐งก".as_bytes());
+
+ // One multi-byte code point, and a length shorter than it, so we can't slice it
+ let r = truncate_utf8("\u{0836}", 1);
+ assert!(r.is_none());
+ }
+
+ #[test]
+ fn test_truncate_max_binary_chars() {
+ let r =
+ truncate_binary(&[0xFF, 0xFE, 0xFD, 0xFF, 0xFF, 0xFF], 5).and_then(increment);
+
+ assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
+
+ // We can truncate this slice, but increment it will fail
+ let truncated = truncate_binary(&[0xFF, 0xFF, 0xFF, 0xFF], 3);
+ assert!(truncated.is_some());
+
+ let incremented = truncated.and_then(increment);
+ assert!(incremented.is_none())
+ }
+
/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 40f6cf312..bb8346306 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -868,13 +868,13 @@ impl ColumnIndexBuilder {
pub fn append(
&mut self,
null_page: bool,
- min_value: &[u8],
- max_value: &[u8],
+ min_value: Vec<u8>,
+ max_value: Vec<u8>,
null_count: i64,
) {
self.null_pages.push(null_page);
- self.min_values.push(min_value.to_vec());
- self.max_values.push(max_value.to_vec());
+ self.min_values.push(min_value);
+ self.max_values.push(max_value);
self.null_counts.push(null_count);
}
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 9724fd7f4..3d6390c03 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -45,6 +45,8 @@ pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str =
concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
+/// Default value for [`WriterProperties::column_index_truncate_length`]
+pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
/// Default value for [`BloomFilterProperties::fpp`]
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
/// Default value for [`BloomFilterProperties::ndv`]
@@ -121,6 +123,7 @@ pub struct WriterProperties {
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
+ column_index_truncate_length: Option<usize>,
}
impl Default for WriterProperties {
@@ -219,6 +222,13 @@ impl WriterProperties {
self.sorting_columns.as_ref()
}
+ /// Returns the maximum length of truncated min/max values in the column index.
+ ///
+ /// `None` if truncation is disabled, must be greater than 0 otherwise.
+ pub fn column_index_truncate_length(&self) -> Option<usize> {
+ self.column_index_truncate_length
+ }
+
/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
@@ -314,6 +324,7 @@ pub struct WriterPropertiesBuilder {
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
+ column_index_truncate_length: Option<usize>,
}
impl WriterPropertiesBuilder {
@@ -331,6 +342,7 @@ impl WriterPropertiesBuilder {
default_column_properties: Default::default(),
column_properties: HashMap::new(),
sorting_columns: None,
+ column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
}
}
@@ -348,6 +360,7 @@ impl WriterPropertiesBuilder {
default_column_properties: self.default_column_properties,
column_properties: self.column_properties,
sorting_columns: self.sorting_columns,
+ column_index_truncate_length: self.column_index_truncate_length,
}
}
@@ -620,6 +633,17 @@ impl WriterPropertiesBuilder {
self.get_mut_props(col).set_bloom_filter_ndv(value);
self
}
+
+ /// Sets the max length of min/max value fields in the column index. Must be greater than 0.
+ /// If set to `None` - there's no effective limit.
+ pub fn set_column_index_truncate_length(mut self, max_length: Option<usize>) -> Self {
+ if let Some(value) = max_length {
+ assert!(value > 0, "Cannot have a 0 column index truncate length. If you wish to disable min/max value truncation, set it to `None`.");
+ }
+
+ self.column_index_truncate_length = max_length;
+ self
+ }
}
/// Controls the level of statistics to be computed by the writer