You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/09 12:51:49 UTC
[arrow-rs] 01/02: bloom filter reader
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch bloom-filter-reader
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 3cc48d34d9bcc9e0922cd678ef34be6a26ff55b6
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Wed Nov 9 17:47:57 2022 +0800
bloom filter reader
---
parquet/src/bloom_filter/mod.rs | 60 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 60 insertions(+)
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index e455e6252..9166eadc3 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -18,7 +18,20 @@
//! Bloom filter implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::reader::ChunkReader;
+use crate::format::{
+ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
+ RowGroup,
+};
use std::hash::Hasher;
+use std::io::Cursor;
+use std::io::IoSliceMut;
+use std::io::{Read, Seek, SeekFrom};
+use std::iter;
+use thrift::protocol::TCompactInputProtocol;
use twox_hash::XxHash64;
const SALT: [u32; 8] = [
@@ -72,6 +85,53 @@ fn block_check(block: &Block, hash: u32) -> bool {
pub(crate) struct Sbbf(Vec<Block>);
impl Sbbf {
+ pub fn read_from_column_chunk<R: Read + Seek>(
+ column_metadata: &ColumnChunkMetaData,
+ mut reader: &mut R,
+ ) -> Result<Self, ParquetError> {
+ let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
+ ParquetError::General("Bloom filter offset is not set".to_string())
+ })? as u64;
+ reader.seek(SeekFrom::Start(offset))?;
+ // deserialize header
+ let mut prot = TCompactInputProtocol::new(&mut reader);
+ let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
+
+ match header.algorithm {
+ BloomFilterAlgorithm::BLOCK(_) => {
+ // this match exists to future proof the singleton algorithm enum
+ }
+ }
+ match header.compression {
+ BloomFilterCompression::UNCOMPRESSED(_) => {
+ // this match exists to future proof the singleton compression enum
+ }
+ }
+ match header.hash {
+ BloomFilterHash::XXHASH(_) => {
+ // this match exists to future proof the singleton hash enum
+ }
+ }
+ let length: usize = header.num_bytes.try_into().map_err(|_| {
+ ParquetError::General("Bloom filter length is invalid".to_string())
+ })?;
+ let mut buffer = vec![0_u8; length];
+ reader.read_exact(&mut buffer).map_err(|e| {
+ ParquetError::General(format!("Could not read bloom filter: {}", e))
+ })?;
+ let data = buffer
+ .chunks_exact(4 * 8)
+ .map(|chunk| {
+ let mut block = [0_u32; 8];
+ for (i, word) in chunk.chunks_exact(4).enumerate() {
+ block[i] = u32::from_le_bytes(word.try_into().unwrap());
+ }
+ block
+ })
+ .collect::<Vec<Block>>();
+ Ok(Self(data))
+ }
+
#[inline]
fn hash_to_block_index(&self, hash: u64) -> usize {
// unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul