You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/16 03:04:45 UTC

[arrow-rs] branch master updated: parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo (#3102)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 73d66d837 parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo (#3102)
73d66d837 is described below

commit 73d66d837c20e3b80a77fdad5018f7872de4ef9d
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Wed Nov 16 11:04:40 2022 +0800

    parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo (#3102)
    
    * add feature flag
    
    * add api
    
    * fix reading with chunk reader
    
    * refactor
    
    * add a binary to demo
    
    * add bin
    
    * remove unused
    
    * fix clippy
    
    * adjust byte size
    
    * update read method
    
    * parquet-show-bloom-filter with bloom feature required
    
    * remove extern crate
    
    * get rid of loop read
    
    * refactor to test
    
    * rework api
    
    * remove unused trait
    
    * update help
---
 parquet/Cargo.toml                           |   4 +
 parquet/README.md                            |   1 +
 parquet/src/bin/parquet-read.rs              |   2 -
 parquet/src/bin/parquet-rowcount.rs          |   1 -
 parquet/src/bin/parquet-schema.rs            |   1 -
 parquet/src/bin/parquet-show-bloom-filter.rs | 110 ++++++++++++++++++++++++
 parquet/src/bloom_filter/mod.rs              | 124 ++++++++++++++++++++++-----
 parquet/src/file/reader.rs                   |   6 ++
 parquet/src/file/serialized_reader.rs        |  18 ++--
 9 files changed, 235 insertions(+), 32 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index a5d43bf54..fc7c8218a 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -113,6 +113,10 @@ required-features = ["cli"]
 name = "parquet-fromcsv"
 required-features = ["arrow", "cli"]
 
+[[bin]]
+name = "parquet-show-bloom-filter"
+required-features = ["cli", "bloom"]
+
 [[bench]]
 name = "arrow_writer"
 required-features = ["arrow"]
diff --git a/parquet/README.md b/parquet/README.md
index d904fc64e..c9245b082 100644
--- a/parquet/README.md
+++ b/parquet/README.md
@@ -41,6 +41,7 @@ However, for historical reasons, this crate uses versions with major numbers gre
 The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:
 
 - `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
+- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
 - `async` - support `async` APIs for reading parquet
 - `json` - support for reading / writing `json` data to / from parquet
 - `brotli` (default) - support for parquet using `brotli` compression
diff --git a/parquet/src/bin/parquet-read.rs b/parquet/src/bin/parquet-read.rs
index cf8009956..117f9ee0b 100644
--- a/parquet/src/bin/parquet-read.rs
+++ b/parquet/src/bin/parquet-read.rs
@@ -36,8 +36,6 @@
 //! Note that `parquet-read` reads full file schema, no projection or filtering is
 //! applied.
 
-extern crate parquet;
-
 use clap::Parser;
 use parquet::file::reader::{FileReader, SerializedFileReader};
 use parquet::record::Row;
diff --git a/parquet/src/bin/parquet-rowcount.rs b/parquet/src/bin/parquet-rowcount.rs
index 491f582c5..5069d4b25 100644
--- a/parquet/src/bin/parquet-rowcount.rs
+++ b/parquet/src/bin/parquet-rowcount.rs
@@ -36,7 +36,6 @@
 //! Note that `parquet-rowcount` reads full file schema, no projection or filtering is
 //! applied.
 
-extern crate parquet;
 use clap::Parser;
 use parquet::file::reader::{FileReader, SerializedFileReader};
 use std::{fs::File, path::Path};
diff --git a/parquet/src/bin/parquet-schema.rs b/parquet/src/bin/parquet-schema.rs
index cd8e76922..ff7798a91 100644
--- a/parquet/src/bin/parquet-schema.rs
+++ b/parquet/src/bin/parquet-schema.rs
@@ -36,7 +36,6 @@
 //! Note that `verbose` is an optional boolean flag that allows to print schema only,
 //! when not provided or print full file metadata when provided.
 
-extern crate parquet;
 use clap::Parser;
 use parquet::{
     file::reader::{FileReader, SerializedFileReader},
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
new file mode 100644
index 000000000..a4dbdbe67
--- /dev/null
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary file to read bloom filter data from a Parquet file.
+//!
+//! # Install
+//!
+//! `parquet-show-bloom-filter` can be installed using `cargo`:
+//! ```
+//! cargo install parquet --features=cli
+//! ```
+//! After this `parquet-show-bloom-filter` should be available:
+//! ```
+//! parquet-show-bloom-filter --file-name XYZ.parquet --column id --values a
+//! ```
+//!
+//! The binary can also be built from the source code and run as follows:
+//! ```
+//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a
+//! ```
+
+use clap::Parser;
+use parquet::file::reader::{FileReader, SerializedFileReader};
+use std::{fs::File, path::Path};
+
+#[derive(Debug, Parser)]
+#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)]
+struct Args {
+    #[clap(short, long, help("Path to the parquet file"))]
+    file_name: String,
+    #[clap(
+        short,
+        long,
+        help("Check the bloom filter indexes for the given column")
+    )]
+    column: String,
+    #[clap(
+        short,
+        long,
+        help("Check if the given values match bloom filter, the values will be evaluated as strings"),
+        required = true
+    )]
+    values: Vec<String>,
+}
+
+fn main() {
+    let args = Args::parse();
+    let file_name = args.file_name;
+    let path = Path::new(&file_name);
+    let file = File::open(path).expect("Unable to open file");
+
+    let file_reader =
+        SerializedFileReader::new(file).expect("Unable to open file as Parquet");
+    let metadata = file_reader.metadata();
+    for (ri, row_group) in metadata.row_groups().iter().enumerate() {
+        println!("Row group #{}", ri);
+        println!("{}", "=".repeat(80));
+        if let Some((column_index, _)) = row_group
+            .columns()
+            .iter()
+            .enumerate()
+            .find(|(_, column)| column.column_path().string() == args.column)
+        {
+            let row_group_reader = file_reader
+                .get_row_group(ri)
+                .expect("Unable to read row group");
+            if let Some(sbbf) = row_group_reader
+                .get_column_bloom_filter(column_index)
+                .expect("Failed to parse bloom filter")
+            {
+                args.values.iter().for_each(|value| {
+                    println!(
+                        "Value {} is {} in bloom filter",
+                        value,
+                        if sbbf.check(value.as_str()) {
+                            "present"
+                        } else {
+                            "absent"
+                        }
+                    )
+                });
+            }
+        } else {
+            println!(
+                "No column named {} found, candidate columns are: {}",
+                args.column,
+                row_group
+                    .columns()
+                    .iter()
+                    .map(|c| c.column_path().string())
+                    .collect::<Vec<_>>()
+                    .join(", ")
+            );
+        }
+    }
+}
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index adfd87307..4944a93f8 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -18,13 +18,16 @@
 //! Bloom filter implementation specific to Parquet, as described
 //! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
 
+use crate::data_type::AsBytes;
 use crate::errors::ParquetError;
 use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::reader::ChunkReader;
 use crate::format::{
     BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
 };
+use bytes::{Buf, Bytes};
 use std::hash::Hasher;
-use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
 use thrift::protocol::{TCompactInputProtocol, TSerializable};
 use twox_hash::XxHash64;
 
@@ -79,6 +82,37 @@ fn block_check(block: &Block, hash: u32) -> bool {
 /// A split block Bloom filter
 pub struct Sbbf(Vec<Block>);
 
+const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
+
+/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
+/// both the header and the offset after it (for bitset).
+fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
+    offset: u64,
+    reader: Arc<R>,
+) -> Result<(BloomFilterHeader, u64), ParquetError> {
+    let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?;
+    let (header, length) = read_bloom_filter_header_and_length(buffer)?;
+    Ok((header, offset + length))
+}
+
+/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
+/// length of the header.
+#[inline]
+fn read_bloom_filter_header_and_length(
+    buffer: Bytes,
+) -> Result<(BloomFilterHeader, u64), ParquetError> {
+    let total_length = buffer.len();
+    let mut buf_reader = buffer.reader();
+    let mut prot = TCompactInputProtocol::new(&mut buf_reader);
+    let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| {
+        ParquetError::General(format!("Could not read bloom filter header: {}", e))
+    })?;
+    Ok((
+        header,
+        (total_length - buf_reader.into_inner().remaining()) as u64,
+    ))
+}
+
 impl Sbbf {
     fn new(bitset: &[u8]) -> Self {
         let data = bitset
@@ -94,17 +128,20 @@ impl Sbbf {
         Self(data)
     }
 
-    pub fn read_from_column_chunk<R: Read + Seek>(
+    pub fn read_from_column_chunk<R: ChunkReader>(
         column_metadata: &ColumnChunkMetaData,
-        mut reader: &mut R,
-    ) -> Result<Self, ParquetError> {
-        let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
-            ParquetError::General("Bloom filter offset is not set".to_string())
-        })? as u64;
-        reader.seek(SeekFrom::Start(offset))?;
-        // deserialize header
-        let mut prot = TCompactInputProtocol::new(&mut reader);
-        let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
+        reader: Arc<R>,
+    ) -> Result<Option<Self>, ParquetError> {
+        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let (header, bitset_offset) =
+            chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;
 
         match header.algorithm {
             BloomFilterAlgorithm::BLOCK(_) => {
@@ -125,11 +162,8 @@ impl Sbbf {
         let length: usize = header.num_bytes.try_into().map_err(|_| {
             ParquetError::General("Bloom filter length is invalid".to_string())
         })?;
-        let mut buffer = vec![0_u8; length];
-        reader.read_exact(&mut buffer).map_err(|e| {
-            ParquetError::General(format!("Could not read bloom filter: {}", e))
-        })?;
-        Ok(Self::new(&buffer))
+        let bitset = reader.get_bytes(bitset_offset, length)?;
+        Ok(Some(Self::new(&bitset)))
     }
 
     #[inline]
@@ -139,17 +173,27 @@ impl Sbbf {
         (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
     }
 
+    /// Insert an [AsBytes] value into the filter
+    pub fn insert<T: AsBytes>(&mut self, value: T) {
+        self.insert_hash(hash_as_bytes(value));
+    }
+
     /// Insert a hash into the filter
-    pub fn insert(&mut self, hash: u64) {
+    fn insert_hash(&mut self, hash: u64) {
         let block_index = self.hash_to_block_index(hash);
         let block = &mut self.0[block_index];
         block_insert(block, hash as u32);
     }
 
+    /// Check if an [AsBytes] value is probably present or definitely absent in the filter
+    pub fn check<T: AsBytes>(&self, value: T) -> bool {
+        self.check_hash(hash_as_bytes(value))
+    }
+
     /// Check if a hash is in the filter. May return
     /// true for values that was never inserted ("false positive")
     /// but will always return false if a hash has not been inserted.
-    pub fn check(&self, hash: u64) -> bool {
+    fn check_hash(&self, hash: u64) -> bool {
         let block_index = self.hash_to_block_index(hash);
         let block = &self.0[block_index];
         block_check(block, hash as u32)
@@ -159,19 +203,24 @@ impl Sbbf {
 // per spec we use xxHash with seed=0
 const SEED: u64 = 0;
 
-pub fn hash_bytes<A: AsRef<[u8]>>(value: A) -> u64 {
+#[inline]
+fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
     let mut hasher = XxHash64::with_seed(SEED);
-    hasher.write(value.as_ref());
+    hasher.write(value.as_bytes());
     hasher.finish()
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::format::{
+        BloomFilterAlgorithm, BloomFilterCompression, SplitBlockAlgorithm, Uncompressed,
+        XxHash,
+    };
 
     #[test]
     fn test_hash_bytes() {
-        assert_eq!(hash_bytes(b""), 17241709254077376921);
+        assert_eq!(hash_as_bytes(""), 17241709254077376921);
     }
 
     #[test]
@@ -210,8 +259,37 @@ mod tests {
         let sbbf = Sbbf::new(bitset);
         for a in 0..10i64 {
             let value = format!("a{}", a);
-            let hash = hash_bytes(value);
-            assert!(sbbf.check(hash));
+            assert!(sbbf.check(value.as_str()));
         }
     }
+
+    /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
+    /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
+    /// so altogether it'll be 20 bytes at most.
+    #[test]
+    fn test_bloom_filter_header_size_assumption() {
+        let buffer: &[u8; 16] =
+            &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
+        let (
+            BloomFilterHeader {
+                algorithm,
+                compression,
+                hash,
+                num_bytes,
+            },
+            read_length,
+        ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
+        assert_eq!(read_length, 15);
+        assert_eq!(
+            algorithm,
+            BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
+        );
+        assert_eq!(
+            compression,
+            BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
+        );
+        assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
+        assert_eq!(num_bytes, 32_i32);
+        assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
+    }
 }
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 70ff37a41..325944c21 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -21,6 +21,8 @@
 use bytes::Bytes;
 use std::{boxed::Box, io::Read, sync::Arc};
 
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
 use crate::column::page::PageIterator;
 use crate::column::{page::PageReader, reader::ColumnReader};
 use crate::errors::{ParquetError, Result};
@@ -143,6 +145,10 @@ pub trait RowGroupReader: Send + Sync {
         Ok(col_reader)
     }
 
+    #[cfg(feature = "bloom")]
+    /// Get bloom filter for the `i`th column chunk, if present.
+    fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;
+
     /// Get iterator of `Row`s from this row group.
     ///
     /// Projected schema can be a subset of or equal to the file schema, when it is None,
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index ebe87aca6..cb39dd194 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -22,11 +22,9 @@ use std::collections::VecDeque;
 use std::io::Cursor;
 use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
 
-use crate::format::{PageHeader, PageLocation, PageType};
-use bytes::{Buf, Bytes};
-use thrift::protocol::{TCompactInputProtocol, TSerializable};
-
 use crate::basic::{Encoding, Type};
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
 use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
@@ -38,11 +36,14 @@ use crate::file::{
     reader::*,
     statistics,
 };
+use crate::format::{PageHeader, PageLocation, PageType};
 use crate::record::reader::RowIter;
 use crate::record::Row;
 use crate::schema::types::Type as SchemaType;
 use crate::util::{io::TryClone, memory::ByteBufferPtr};
-// export `SliceableCursor` and `FileSource` publically so clients can
+use bytes::{Buf, Bytes};
+use thrift::protocol::{TCompactInputProtocol, TSerializable};
+// export `SliceableCursor` and `FileSource` publicly so clients can
 // re-use the logic in their own ParquetFileWriter wrappers
 pub use crate::util::io::FileSource;
 
@@ -387,6 +388,13 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
         )?))
     }
 
+    #[cfg(feature = "bloom")]
+    /// get bloom filter for the `i`th column
+    fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>> {
+        let col = self.metadata.column(i);
+        Sbbf::read_from_column_chunk(col, self.chunk_reader.clone())
+    }
+
     fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
         RowIter::from_row_group(projection, self)
     }