You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/18 18:53:35 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #3119: parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties

alamb commented on code in PR #3119:
URL: https://github.com/apache/arrow-rs/pull/3119#discussion_r1026750561


##########
parquet/Cargo.toml:
##########
@@ -77,7 +78,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
 all-features = true
 
 [features]
-default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]

Review Comment:
   👍 



##########
parquet/src/bloom_filter/mod.rs:
##########
@@ -113,7 +116,48 @@ fn read_bloom_filter_header_and_length(
     ))
 }
 
+const BITSET_MIN_LENGTH: usize = 32;
+const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
+
+#[inline]
+fn optimal_num_of_bytes(num_bytes: usize) -> usize {
+    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
+    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
+    num_bytes.next_power_of_two()
+}
+
+// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
+// given fpp = (1 - e^(-k * n / m)) ^ k
+// we have m = - k * n / ln(1 - fpp ^ (1 / k))
+// where k = number of hash functions, m = number of bits, n = number of distinct values
+#[inline]
+fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> f64 {
+    -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln()
+}
+
 impl Sbbf {
+    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
+    /// Will panic if `fpp` is greater than 1.0 or less than 0.0.
+    pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {

Review Comment:
   Since this is a function meant for use outside the parquet crate I would prefer it return an error rather than panic with bad input. 



##########
parquet/src/file/properties.rs:
##########
@@ -82,6 +83,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
 const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
 const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
 const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
+const DEFAULT_BLOOM_FILTER_ENABLED: bool = false;
+const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024;

Review Comment:
   👍 



##########
parquet/src/file/metadata.rs:
##########
@@ -236,7 +236,7 @@ pub struct RowGroupMetaData {
 }
 
 impl RowGroupMetaData {
-    /// Returns builer for row group metadata.
+    /// Returns builder for row group metadata.

Review Comment:
   👍 



##########
parquet/src/file/properties.rs:
##########
@@ -599,17 +681,25 @@ impl ColumnProperties {
     fn max_statistics_size(&self) -> Option<usize> {
         self.max_statistics_size
     }
+
+    def_opt_field_getter!(bloom_filter_enabled, bool);
+    def_opt_field_getter!(bloom_filter_fpp, f64);
+    def_opt_field_getter!(bloom_filter_max_bytes, u32);
+    def_opt_field_getter!(bloom_filter_ndv, u64);
 }
 
 /// Reference counted reader properties.
 pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
 
+const DEFAULT_READ_BLOOM_FILTER: bool = true;

Review Comment:
   I think this perhaps should default to `false` to maintain the current behavior and avoid (potentially) costly semi-random IO if the reader is not going to read the bloom filter



##########
parquet/src/file/writer.rs:
##########
@@ -212,6 +220,35 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    /// Serialize all the bloom filter to the file
+    fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write bloom filter to the file
+        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+            for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
+                match &self.bloom_filters[row_group_idx][column_idx] {
+                    Some(bloom_filter) => {

Review Comment:
   Given the interactionwith TCompact protocol for reading is in `Sbff` what would you think about moving the interaction for writing there as well (like `Sbff::write(writer)`)?



##########
parquet/src/file/properties.rs:
##########
@@ -255,6 +279,11 @@ impl WriterProperties {
             .or_else(|| self.default_column_properties.max_statistics_size())
             .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
     }
+
+    def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED);

Review Comment:
   I think these properties need docstrings -- I am happy to help write them. In particular, I think it should mention that the ndv and fpp are knobs that allow for control over bloom filter accuracy. Also it should mention if these limits are for each column chunk or the entire file (e.g. the ndv value is not the number of distinct values in the entire column)
   
   



##########
parquet/src/bloom_filter/mod.rs:
##########
@@ -292,4 +363,46 @@ mod tests {
         assert_eq!(num_bytes, 32_i32);
         assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
     }
+
+    #[test]
+    fn test_optimal_num_of_bytes() {
+        for (input, expected) in &[
+            (0, 32),
+            (9, 32),
+            (31, 32),
+            (32, 32),
+            (33, 64),
+            (99, 128),
+            (1024, 1024),
+            (999_000_000, 128 * 1024 * 1024),
+        ] {
+            assert_eq!(*expected, optimal_num_of_bytes(*input));
+        }
+    }
+
+    #[test]
+    fn test_num_of_bits_from_ndv_fpp() {
+        for (fpp, ndv, num_bits) in &[
+            (0.1, 10, 57),
+            (0.01, 10, 96),
+            (0.001, 10, 146),
+            (0.1, 100, 577),
+            (0.01, 100, 968),
+            (0.001, 100, 1460),
+            (0.1, 1000, 5772),
+            (0.01, 1000, 9681),
+            (0.001, 1000, 14607),
+            (0.1, 10000, 57725),
+            (0.01, 10000, 96815),
+            (0.001, 10000, 146076),
+            (0.1, 100000, 577254),
+            (0.01, 100000, 968152),
+            (0.001, 100000, 1460769),
+            (0.1, 1000000, 5772541),
+            (0.01, 1000000, 9681526),
+            (0.001, 1000000, 14607697),

Review Comment:
   does this mean a 14MB bloom filter? Its ok as there is a limit in `optimal_num_of_bytes ` but when I saw this it was just 🤯 
   
   It might also be good to pass in some value larger than 2^32 to test there isn't an overflow problem lurking



##########
parquet/src/bloom_filter/mod.rs:
##########
@@ -128,6 +172,33 @@ impl Sbbf {
         Self(data)
     }
 
+    /// Write the bitset in serialized form to the writer.
+    pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {

Review Comment:
   I think it would be good to write a test for round tripping the bloom filters (as in write a SBFF to a Vec<u8> and then read it back out and verify it is the same). Specifically it would be nice to verify the bytes are not scrambled and the lengths are correct and handle empty bitsets (if that is possible)



##########
parquet/src/bloom_filter/mod.rs:
##########
@@ -113,7 +116,48 @@ fn read_bloom_filter_header_and_length(
     ))
 }
 
+const BITSET_MIN_LENGTH: usize = 32;
+const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
+
+#[inline]
+fn optimal_num_of_bytes(num_bytes: usize) -> usize {
+    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
+    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
+    num_bytes.next_power_of_two()
+}
+
+// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf

Review Comment:
   Here is the parquet-mr code: https://github.com/apache/parquet-mr/blob/d057b39d93014fe40f5067ee4a33621e65c91552/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java#L277-L304
   
   That looks very similar



##########
parquet/src/file/reader.rs:
##########
@@ -145,9 +144,8 @@ pub trait RowGroupReader: Send + Sync {
         Ok(col_reader)
     }
 
-    #[cfg(feature = "bloom")]
     /// Get bloom filter for the `i`th column chunk, if present.

Review Comment:
   ```suggestion
       /// Get bloom filter for the `i`th column chunk, if present and the reader was configured
       /// to read bloom filters.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org