You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/13 13:07:16 UTC

[arrow-rs] branch master updated: add bloom filter implementation based on split block (sbbf) spec (#3057)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b7af85cb8 add bloom filter implementation based on split block (sbbf) spec (#3057)
b7af85cb8 is described below

commit b7af85cb8dfe6887bb3fd43d1d76f659473b6927
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sun Nov 13 21:07:11 2022 +0800

    add bloom filter implementation based on split block (sbbf) spec (#3057)
    
    * add bloom filter implementation based on split block spec
    
    * format and also revist index method
    
    * bloom filter reader
    
    * create new function to facilitate fixture test
    
    * fix clippy
    
    * Update parquet/src/bloom_filter/mod.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update parquet/src/bloom_filter/mod.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update parquet/src/bloom_filter/mod.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update parquet/src/bloom_filter/mod.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update parquet/src/bloom_filter/mod.rs
    
    * Update parquet/src/bloom_filter/mod.rs
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    
    * fix clippy
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 parquet/Cargo.toml              |   5 +-
 parquet/src/bloom_filter/mod.rs | 217 ++++++++++++++++++++++++++++++++++++++++
 parquet/src/lib.rs              |   2 +
 3 files changed, 223 insertions(+), 1 deletion(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index b400b01a7..dda0518f9 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -57,6 +57,7 @@ seq-macro = { version = "0.3", default-features = false }
 futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
 tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
 hashbrown = { version = "0.13", default-features = false }
+twox-hash = { version = "1.6", optional = true }
 
 [dev-dependencies]
 base64 = { version = "0.13", default-features = false, features = ["std"] }
@@ -76,7 +77,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
 all-features = true
 
 [features]
-default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
 # Enable arrow reader/writer APIs
 arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
 # Enable CLI tools
@@ -89,6 +90,8 @@ test_common = ["arrow/test_utils"]
 experimental = []
 # Enable async APIs
 async = ["futures", "tokio"]
+# Bloomfilter
+bloom = ["twox-hash"]
 
 [[test]]
 name = "arrow_writer_layout"
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
new file mode 100644
index 000000000..770fb53e8
--- /dev/null
+++ b/parquet/src/bloom_filter/mod.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bloom filter implementation specific to Parquet, as described
+//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
+
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::format::{
+    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+};
+use std::hash::Hasher;
+use std::io::{Read, Seek, SeekFrom};
+use thrift::protocol::TCompactInputProtocol;
+use twox_hash::XxHash64;
+
+/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
+const SALT: [u32; 8] = [
+    0x47b6137b_u32,
+    0x44974d91_u32,
+    0x8824ad5b_u32,
+    0xa2b7289d_u32,
+    0x705495c7_u32,
+    0x2df1424b_u32,
+    0x9efc4947_u32,
+    0x5c6bfb31_u32,
+];
+
+/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
+/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
+type Block = [u32; 8];
+
+/// takes as its argument a single unsigned 32-bit integer and returns a block in which each
+/// word has exactly one bit set.
+fn mask(x: u32) -> Block {
+    let mut result = [0_u32; 8];
+    for i in 0..8 {
+        // wrapping instead of checking for overflow
+        let y = x.wrapping_mul(SALT[i]);
+        let y = y >> 27;
+        result[i] = 1 << y;
+    }
+    result
+}
+
+/// setting every bit in the block that was also set in the result from mask
+fn block_insert(block: &mut Block, hash: u32) {
+    let mask = mask(hash);
+    for i in 0..8 {
+        block[i] |= mask[i];
+    }
+}
+
+/// returns true when every bit that is set in the result of mask is also set in the block.
+fn block_check(block: &Block, hash: u32) -> bool {
+    let mask = mask(hash);
+    for i in 0..8 {
+        if block[i] & mask[i] == 0 {
+            return false;
+        }
+    }
+    true
+}
+
+/// A split block Bloom filter
+pub struct Sbbf(Vec<Block>);
+
+impl Sbbf {
+    fn new(bitset: &[u8]) -> Self {
+        let data = bitset
+            .chunks_exact(4 * 8)
+            .map(|chunk| {
+                let mut block = [0_u32; 8];
+                for (i, word) in chunk.chunks_exact(4).enumerate() {
+                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
+                }
+                block
+            })
+            .collect::<Vec<Block>>();
+        Self(data)
+    }
+
+    pub fn read_from_column_chunk<R: Read + Seek>(
+        column_metadata: &ColumnChunkMetaData,
+        mut reader: &mut R,
+    ) -> Result<Self, ParquetError> {
+        let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
+            ParquetError::General("Bloom filter offset is not set".to_string())
+        })? as u64;
+        reader.seek(SeekFrom::Start(offset))?;
+        // deserialize header
+        let mut prot = TCompactInputProtocol::new(&mut reader);
+        let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
+
+        match header.algorithm {
+            BloomFilterAlgorithm::BLOCK(_) => {
+                // this match exists to future proof the singleton algorithm enum
+            }
+        }
+        match header.compression {
+            BloomFilterCompression::UNCOMPRESSED(_) => {
+                // this match exists to future proof the singleton compression enum
+            }
+        }
+        match header.hash {
+            BloomFilterHash::XXHASH(_) => {
+                // this match exists to future proof the singleton hash enum
+            }
+        }
+        // length in bytes
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let mut buffer = vec![0_u8; length];
+        reader.read_exact(&mut buffer).map_err(|e| {
+            ParquetError::General(format!("Could not read bloom filter: {}", e))
+        })?;
+        Ok(Self::new(&buffer))
+    }
+
+    #[inline]
+    fn hash_to_block_index(&self, hash: u64) -> usize {
+        // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
+        // but it will not saturate
+        (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
+    }
+
+    /// Insert a hash into the filter
+    pub fn insert(&mut self, hash: u64) {
+        let block_index = self.hash_to_block_index(hash);
+        let block = &mut self.0[block_index];
+        block_insert(block, hash as u32);
+    }
+
+    /// Check if a hash is in the filter. May return
+    /// true for values that was never inserted ("false positive")
+    /// but will always return false if a hash has not been inserted.
+    pub fn check(&self, hash: u64) -> bool {
+        let block_index = self.hash_to_block_index(hash);
+        let block = &self.0[block_index];
+        block_check(block, hash as u32)
+    }
+}
+
+// per spec we use xxHash with seed=0
+const SEED: u64 = 0;
+
+pub fn hash_bytes<A: AsRef<[u8]>>(value: A) -> u64 {
+    let mut hasher = XxHash64::with_seed(SEED);
+    hasher.write(value.as_ref());
+    hasher.finish()
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_hash_bytes() {
+        assert_eq!(hash_bytes(b""), 17241709254077376921);
+    }
+
+    #[test]
+    fn test_mask_set_quick_check() {
+        for i in 0..1_000_000 {
+            let result = mask(i);
+            assert!(result.iter().all(|&x| x.count_ones() == 1));
+        }
+    }
+
+    #[test]
+    fn test_block_insert_and_check() {
+        for i in 0..1_000_000 {
+            let mut block = [0_u32; 8];
+            block_insert(&mut block, i);
+            assert!(block_check(&block, i));
+        }
+    }
+
+    #[test]
+    fn test_sbbf_insert_and_check() {
+        let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]);
+        for i in 0..1_000_000 {
+            sbbf.insert(i);
+            assert!(sbbf.check(i));
+        }
+    }
+
+    #[test]
+    fn test_with_fixture() {
+        // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
+        let bitset: &[u8] = &[
+            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43,
+            33, 0, 5, 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
+        ];
+        let sbbf = Sbbf::new(bitset);
+        for a in 0..10i64 {
+            let value = format!("a{}", a);
+            let hash = hash_bytes(value);
+            assert!(sbbf.check(hash));
+        }
+    }
+}
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index 07cddfc3f..cd29d02f8 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -84,6 +84,8 @@ pub mod arrow;
 pub mod column;
 experimental!(mod compression);
 experimental!(mod encodings);
+#[cfg(feature = "bloom")]
+pub mod bloom_filter;
 pub mod file;
 pub mod record;
 pub mod schema;