You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/16 03:04:45 UTC
[arrow-rs] branch master updated: parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo (#3102)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 73d66d837 parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo (#3102)
73d66d837 is described below
commit 73d66d837c20e3b80a77fdad5018f7872de4ef9d
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Wed Nov 16 11:04:40 2022 +0800
parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo (#3102)
* add feature flag
* add api
* fix reading with chunk reader
* refactor
* add a binary to demo
* add bin
* remove unused
* fix clippy
* adjust byte size
* update read method
* parquet-show-bloom-filter with bloom feature required
* remove extern crate
* get rid of loop read
* refactor to test
* rework api
* remove unused trait
* update help
---
parquet/Cargo.toml | 4 +
parquet/README.md | 1 +
parquet/src/bin/parquet-read.rs | 2 -
parquet/src/bin/parquet-rowcount.rs | 1 -
parquet/src/bin/parquet-schema.rs | 1 -
parquet/src/bin/parquet-show-bloom-filter.rs | 110 ++++++++++++++++++++++++
parquet/src/bloom_filter/mod.rs | 124 ++++++++++++++++++++++-----
parquet/src/file/reader.rs | 6 ++
parquet/src/file/serialized_reader.rs | 18 ++--
9 files changed, 235 insertions(+), 32 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index a5d43bf54..fc7c8218a 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -113,6 +113,10 @@ required-features = ["cli"]
name = "parquet-fromcsv"
required-features = ["arrow", "cli"]
+[[bin]]
+name = "parquet-show-bloom-filter"
+required-features = ["cli", "bloom"]
+
[[bench]]
name = "arrow_writer"
required-features = ["arrow"]
diff --git a/parquet/README.md b/parquet/README.md
index d904fc64e..c9245b082 100644
--- a/parquet/README.md
+++ b/parquet/README.md
@@ -41,6 +41,7 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:
- `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
+- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
- `async` - support `async` APIs for reading parquet
- `json` - support for reading / writing `json` data to / from parquet
- `brotli` (default) - support for parquet using `brotli` compression
diff --git a/parquet/src/bin/parquet-read.rs b/parquet/src/bin/parquet-read.rs
index cf8009956..117f9ee0b 100644
--- a/parquet/src/bin/parquet-read.rs
+++ b/parquet/src/bin/parquet-read.rs
@@ -36,8 +36,6 @@
//! Note that `parquet-read` reads full file schema, no projection or filtering is
//! applied.
-extern crate parquet;
-
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
diff --git a/parquet/src/bin/parquet-rowcount.rs b/parquet/src/bin/parquet-rowcount.rs
index 491f582c5..5069d4b25 100644
--- a/parquet/src/bin/parquet-rowcount.rs
+++ b/parquet/src/bin/parquet-rowcount.rs
@@ -36,7 +36,6 @@
//! Note that `parquet-rowcount` reads full file schema, no projection or filtering is
//! applied.
-extern crate parquet;
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};
diff --git a/parquet/src/bin/parquet-schema.rs b/parquet/src/bin/parquet-schema.rs
index cd8e76922..ff7798a91 100644
--- a/parquet/src/bin/parquet-schema.rs
+++ b/parquet/src/bin/parquet-schema.rs
@@ -36,7 +36,6 @@
//! Note that `verbose` is an optional boolean flag that allows to print schema only,
//! when not provided or print full file metadata when provided.
-extern crate parquet;
use clap::Parser;
use parquet::{
file::reader::{FileReader, SerializedFileReader},
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
new file mode 100644
index 000000000..a4dbdbe67
--- /dev/null
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary file to read bloom filter data from a Parquet file.
+//!
+//! # Install
+//!
+//! `parquet-show-bloom-filter` can be installed using `cargo`:
+//! ```
+//! cargo install parquet --features=cli
+//! ```
+//! After this `parquet-show-bloom-filter` should be available:
+//! ```
+//! parquet-show-bloom-filter --file-name XYZ.parquet --column id --values a
+//! ```
+//!
+//! The binary can also be built from the source code and run as follows:
+//! ```
+//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a
+//! ```
+
+use clap::Parser;
+use parquet::file::reader::{FileReader, SerializedFileReader};
+use std::{fs::File, path::Path};
+
+#[derive(Debug, Parser)]
+#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)]
+struct Args {
+ #[clap(short, long, help("Path to the parquet file"))]
+ file_name: String,
+ #[clap(
+ short,
+ long,
+ help("Check the bloom filter indexes for the given column")
+ )]
+ column: String,
+ #[clap(
+ short,
+ long,
+ help("Check if the given values match bloom filter, the values will be evaluated as strings"),
+ required = true
+ )]
+ values: Vec<String>,
+}
+
+fn main() {
+ let args = Args::parse();
+ let file_name = args.file_name;
+ let path = Path::new(&file_name);
+ let file = File::open(path).expect("Unable to open file");
+
+ let file_reader =
+ SerializedFileReader::new(file).expect("Unable to open file as Parquet");
+ let metadata = file_reader.metadata();
+ for (ri, row_group) in metadata.row_groups().iter().enumerate() {
+ println!("Row group #{}", ri);
+ println!("{}", "=".repeat(80));
+ if let Some((column_index, _)) = row_group
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)| column.column_path().string() == args.column)
+ {
+ let row_group_reader = file_reader
+ .get_row_group(ri)
+ .expect("Unable to read row group");
+ if let Some(sbbf) = row_group_reader
+ .get_column_bloom_filter(column_index)
+ .expect("Failed to parse bloom filter")
+ {
+ args.values.iter().for_each(|value| {
+ println!(
+ "Value {} is {} in bloom filter",
+ value,
+ if sbbf.check(value.as_str()) {
+ "present"
+ } else {
+ "absent"
+ }
+ )
+ });
+ }
+ } else {
+ println!(
+ "No column named {} found, candidate columns are: {}",
+ args.column,
+ row_group
+ .columns()
+ .iter()
+ .map(|c| c.column_path().string())
+ .collect::<Vec<_>>()
+ .join(", ")
+ );
+ }
+ }
+}
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index adfd87307..4944a93f8 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -18,13 +18,16 @@
//! Bloom filter implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
+use crate::data_type::AsBytes;
use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
};
+use bytes::{Buf, Bytes};
use std::hash::Hasher;
-use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use twox_hash::XxHash64;
@@ -79,6 +82,37 @@ fn block_check(block: &Block, hash: u32) -> bool {
/// A split block Bloom filter
pub struct Sbbf(Vec<Block>);
+const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
+
+/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
+/// both the header and the offset after it (for bitset).
+fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
+ offset: u64,
+ reader: Arc<R>,
+) -> Result<(BloomFilterHeader, u64), ParquetError> {
+ let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?;
+ let (header, length) = read_bloom_filter_header_and_length(buffer)?;
+ Ok((header, offset + length))
+}
+
+/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
+/// length of the header.
+#[inline]
+fn read_bloom_filter_header_and_length(
+ buffer: Bytes,
+) -> Result<(BloomFilterHeader, u64), ParquetError> {
+ let total_length = buffer.len();
+ let mut buf_reader = buffer.reader();
+ let mut prot = TCompactInputProtocol::new(&mut buf_reader);
+ let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| {
+ ParquetError::General(format!("Could not read bloom filter header: {}", e))
+ })?;
+ Ok((
+ header,
+ (total_length - buf_reader.into_inner().remaining()) as u64,
+ ))
+}
+
impl Sbbf {
fn new(bitset: &[u8]) -> Self {
let data = bitset
@@ -94,17 +128,20 @@ impl Sbbf {
Self(data)
}
- pub fn read_from_column_chunk<R: Read + Seek>(
+ pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
- mut reader: &mut R,
- ) -> Result<Self, ParquetError> {
- let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
- ParquetError::General("Bloom filter offset is not set".to_string())
- })? as u64;
- reader.seek(SeekFrom::Start(offset))?;
- // deserialize header
- let mut prot = TCompactInputProtocol::new(&mut reader);
- let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
+ reader: Arc<R>,
+ ) -> Result<Option<Self>, ParquetError> {
+ let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
+ offset.try_into().map_err(|_| {
+ ParquetError::General("Bloom filter offset is invalid".to_string())
+ })?
+ } else {
+ return Ok(None);
+ };
+
+ let (header, bitset_offset) =
+ chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;
match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
@@ -125,11 +162,8 @@ impl Sbbf {
let length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
- let mut buffer = vec![0_u8; length];
- reader.read_exact(&mut buffer).map_err(|e| {
- ParquetError::General(format!("Could not read bloom filter: {}", e))
- })?;
- Ok(Self::new(&buffer))
+ let bitset = reader.get_bytes(bitset_offset, length)?;
+ Ok(Some(Self::new(&bitset)))
}
#[inline]
@@ -139,17 +173,27 @@ impl Sbbf {
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}
+ /// Insert an [AsBytes] value into the filter
+ pub fn insert<T: AsBytes>(&mut self, value: T) {
+ self.insert_hash(hash_as_bytes(value));
+ }
+
/// Insert a hash into the filter
- pub fn insert(&mut self, hash: u64) {
+ fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
let block = &mut self.0[block_index];
block_insert(block, hash as u32);
}
+ /// Check if an [AsBytes] value is probably present or definitely absent in the filter
+ pub fn check<T: AsBytes>(&self, value: T) -> bool {
+ self.check_hash(hash_as_bytes(value))
+ }
+
/// Check if a hash is in the filter. May return
/// true for values that was never inserted ("false positive")
/// but will always return false if a hash has not been inserted.
- pub fn check(&self, hash: u64) -> bool {
+ fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
let block = &self.0[block_index];
block_check(block, hash as u32)
@@ -159,19 +203,24 @@ impl Sbbf {
// per spec we use xxHash with seed=0
const SEED: u64 = 0;
-pub fn hash_bytes<A: AsRef<[u8]>>(value: A) -> u64 {
+#[inline]
+fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
- hasher.write(value.as_ref());
+ hasher.write(value.as_bytes());
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::format::{
+ BloomFilterAlgorithm, BloomFilterCompression, SplitBlockAlgorithm, Uncompressed,
+ XxHash,
+ };
#[test]
fn test_hash_bytes() {
- assert_eq!(hash_bytes(b""), 17241709254077376921);
+ assert_eq!(hash_as_bytes(""), 17241709254077376921);
}
#[test]
@@ -210,8 +259,37 @@ mod tests {
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{}", a);
- let hash = hash_bytes(value);
- assert!(sbbf.check(hash));
+ assert!(sbbf.check(value.as_str()));
}
}
+
+ /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
+ /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
+ /// so altogether it'll be 20 bytes at most.
+ #[test]
+ fn test_bloom_filter_header_size_assumption() {
+ let buffer: &[u8; 16] =
+ &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
+ let (
+ BloomFilterHeader {
+ algorithm,
+ compression,
+ hash,
+ num_bytes,
+ },
+ read_length,
+ ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
+ assert_eq!(read_length, 15);
+ assert_eq!(
+ algorithm,
+ BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
+ );
+ assert_eq!(
+ compression,
+ BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
+ );
+ assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
+ assert_eq!(num_bytes, 32_i32);
+ assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
+ }
}
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 70ff37a41..325944c21 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -21,6 +21,8 @@
use bytes::Bytes;
use std::{boxed::Box, io::Read, sync::Arc};
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
use crate::column::page::PageIterator;
use crate::column::{page::PageReader, reader::ColumnReader};
use crate::errors::{ParquetError, Result};
@@ -143,6 +145,10 @@ pub trait RowGroupReader: Send + Sync {
Ok(col_reader)
}
+ #[cfg(feature = "bloom")]
+ /// Get bloom filter for the `i`th column chunk, if present.
+ fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;
+
/// Get iterator of `Row`s from this row group.
///
/// Projected schema can be a subset of or equal to the file schema, when it is None,
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index ebe87aca6..cb39dd194 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -22,11 +22,9 @@ use std::collections::VecDeque;
use std::io::Cursor;
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
-use crate::format::{PageHeader, PageLocation, PageType};
-use bytes::{Buf, Bytes};
-use thrift::protocol::{TCompactInputProtocol, TSerializable};
-
use crate::basic::{Encoding, Type};
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
@@ -38,11 +36,14 @@ use crate::file::{
reader::*,
statistics,
};
+use crate::format::{PageHeader, PageLocation, PageType};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::util::{io::TryClone, memory::ByteBufferPtr};
-// export `SliceableCursor` and `FileSource` publically so clients can
+use bytes::{Buf, Bytes};
+use thrift::protocol::{TCompactInputProtocol, TSerializable};
+// export `SliceableCursor` and `FileSource` publicly so clients can
// re-use the logic in their own ParquetFileWriter wrappers
pub use crate::util::io::FileSource;
@@ -387,6 +388,13 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
)?))
}
+ #[cfg(feature = "bloom")]
+ /// get bloom filter for the `i`th column
+ fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>> {
+ let col = self.metadata.column(i);
+ Sbbf::read_from_column_chunk(col, self.chunk_reader.clone())
+ }
+
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
RowIter::from_row_group(projection, self)
}