You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/09 12:51:48 UTC

[arrow-rs] branch bloom-filter-reader created (now 01aed8377)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch bloom-filter-reader
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


      at 01aed8377 create new function to facilitate fixture test

This branch includes the following new commits:

     new 3cc48d34d bloom filter reader
     new 01aed8377 create new function to facilitate fixture test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-rs] 01/02: bloom filter reader

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch bloom-filter-reader
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 3cc48d34d9bcc9e0922cd678ef34be6a26ff55b6
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Wed Nov 9 17:47:57 2022 +0800

    bloom filter reader
---
 parquet/src/bloom_filter/mod.rs | 60 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index e455e6252..9166eadc3 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -18,7 +18,20 @@
 //! Bloom filter implementation specific to Parquet, as described
 //! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
 
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::reader::ChunkReader;
+use crate::format::{
+    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+    BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
+    RowGroup,
+};
 use std::hash::Hasher;
+use std::io::Cursor;
+use std::io::IoSliceMut;
+use std::io::{Read, Seek, SeekFrom};
+use std::iter;
+use thrift::protocol::TCompactInputProtocol;
 use twox_hash::XxHash64;
 
 const SALT: [u32; 8] = [
@@ -72,6 +85,53 @@ fn block_check(block: &Block, hash: u32) -> bool {
 pub(crate) struct Sbbf(Vec<Block>);
 
 impl Sbbf {
+    pub fn read_from_column_chunk<R: Read + Seek>(
+        column_metadata: &ColumnChunkMetaData,
+        mut reader: &mut R,
+    ) -> Result<Self, ParquetError> {
+        let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
+            ParquetError::General("Bloom filter offset is not set".to_string())
+        })? as u64;
+        reader.seek(SeekFrom::Start(offset))?;
+        // deserialize header
+        let mut prot = TCompactInputProtocol::new(&mut reader);
+        let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
+
+        match header.algorithm {
+            BloomFilterAlgorithm::BLOCK(_) => {
+                // this match exists to future proof the singleton algorithm enum
+            }
+        }
+        match header.compression {
+            BloomFilterCompression::UNCOMPRESSED(_) => {
+                // this match exists to future proof the singleton compression enum
+            }
+        }
+        match header.hash {
+            BloomFilterHash::XXHASH(_) => {
+                // this match exists to future proof the singleton hash enum
+            }
+        }
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let mut buffer = vec![0_u8; length];
+        reader.read_exact(&mut buffer).map_err(|e| {
+            ParquetError::General(format!("Could not read bloom filter: {}", e))
+        })?;
+        let data = buffer
+            .chunks_exact(4 * 8)
+            .map(|chunk| {
+                let mut block = [0_u32; 8];
+                for (i, word) in chunk.chunks_exact(4).enumerate() {
+                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
+                }
+                block
+            })
+            .collect::<Vec<Block>>();
+        Ok(Self(data))
+    }
+
     #[inline]
     fn hash_to_block_index(&self, hash: u64) -> usize {
         // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul


[arrow-rs] 02/02: create new function to facilitate fixture test

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch bloom-filter-reader
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 01aed8377b4d9555405860b1bad1c6c3dbbf84f3
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Wed Nov 9 20:51:31 2022 +0800

    create new function to facilitate fixture test
---
 parquet/src/bloom_filter/mod.rs | 40 +++++++++++++++++++++++++++++-----------
 1 file changed, 29 insertions(+), 11 deletions(-)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 9166eadc3..e32a1d348 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -85,6 +85,20 @@ fn block_check(block: &Block, hash: u32) -> bool {
 pub(crate) struct Sbbf(Vec<Block>);
 
 impl Sbbf {
+    fn new(bitset: &[u8]) -> Self {
+        let data = bitset
+            .chunks_exact(4 * 8)
+            .map(|chunk| {
+                let mut block = [0_u32; 8];
+                for (i, word) in chunk.chunks_exact(4).enumerate() {
+                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
+                }
+                block
+            })
+            .collect::<Vec<Block>>();
+        Self(data)
+    }
+
     pub fn read_from_column_chunk<R: Read + Seek>(
         column_metadata: &ColumnChunkMetaData,
         mut reader: &mut R,
@@ -119,17 +133,7 @@ impl Sbbf {
         reader.read_exact(&mut buffer).map_err(|e| {
             ParquetError::General(format!("Could not read bloom filter: {}", e))
         })?;
-        let data = buffer
-            .chunks_exact(4 * 8)
-            .map(|chunk| {
-                let mut block = [0_u32; 8];
-                for (i, word) in chunk.chunks_exact(4).enumerate() {
-                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
-                }
-                block
-            })
-            .collect::<Vec<Block>>();
-        Ok(Self(data))
+        Ok(Self::new(&buffer))
     }
 
     #[inline]
@@ -197,4 +201,18 @@ mod tests {
             assert!(sbbf.check(i));
         }
     }
+
+    #[test]
+    fn test_with_fixture() {
+        let bitset: &[u8] = &[
+            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43,
+            33, 0, 5, 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
+        ];
+        let sbbf = Sbbf::new(bitset);
+        for a in 0..10i64 {
+            let value = format!("a{}", a);
+            let hash = hash_bytes(value);
+            assert!(sbbf.check(hash));
+        }
+    }
 }