You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/09 12:51:49 UTC

[arrow-rs] 01/02: bloom filter reader

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch bloom-filter-reader
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 3cc48d34d9bcc9e0922cd678ef34be6a26ff55b6
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Wed Nov 9 17:47:57 2022 +0800

    bloom filter reader
---
 parquet/src/bloom_filter/mod.rs | 60 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index e455e6252..9166eadc3 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -18,7 +18,20 @@
 //! Bloom filter implementation specific to Parquet, as described
 //! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
 
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::reader::ChunkReader;
+use crate::format::{
+    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+    BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
+    RowGroup,
+};
 use std::hash::Hasher;
+use std::io::Cursor;
+use std::io::IoSliceMut;
+use std::io::{Read, Seek, SeekFrom};
+use std::iter;
+use thrift::protocol::TCompactInputProtocol;
 use twox_hash::XxHash64;
 
 const SALT: [u32; 8] = [
@@ -72,6 +85,53 @@ fn block_check(block: &Block, hash: u32) -> bool {
 pub(crate) struct Sbbf(Vec<Block>);
 
 impl Sbbf {
+    pub fn read_from_column_chunk<R: Read + Seek>(
+        column_metadata: &ColumnChunkMetaData,
+        mut reader: &mut R,
+    ) -> Result<Self, ParquetError> {
+        let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
+            ParquetError::General("Bloom filter offset is not set".to_string())
+        })? as u64;
+        reader.seek(SeekFrom::Start(offset))?;
+        // deserialize header
+        let mut prot = TCompactInputProtocol::new(&mut reader);
+        let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
+
+        match header.algorithm {
+            BloomFilterAlgorithm::BLOCK(_) => {
+                // this match exists to future proof the singleton algorithm enum
+            }
+        }
+        match header.compression {
+            BloomFilterCompression::UNCOMPRESSED(_) => {
+                // this match exists to future proof the singleton compression enum
+            }
+        }
+        match header.hash {
+            BloomFilterHash::XXHASH(_) => {
+                // this match exists to future proof the singleton hash enum
+            }
+        }
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let mut buffer = vec![0_u8; length];
+        reader.read_exact(&mut buffer).map_err(|e| {
+            ParquetError::General(format!("Could not read bloom filter: {}", e))
+        })?;
+        let data = buffer
+            .chunks_exact(4 * 8)
+            .map(|chunk| {
+                let mut block = [0_u32; 8];
+                for (i, word) in chunk.chunks_exact(4).enumerate() {
+                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
+                }
+                block
+            })
+            .collect::<Vec<Block>>();
+        Ok(Self(data))
+    }
+
     #[inline]
     fn hash_to_block_index(&self, hash: u64) -> usize {
         // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul