You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/22 12:07:03 UTC
[arrow-rs] branch master updated: parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties (#3119)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new e214ccccc parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties (#3119)
e214ccccc is described below
commit e214ccccc702d0295fbf59258a6a817cd09ac4ea
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Tue Nov 22 20:06:57 2022 +0800
parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties (#3119)
* bloom filter part III
- add reader properties
- add writer properties
- remove `bloom` feature
* update row group vec
* fix clippy
* fix clippy
* remove default feature for twox
* incorporate ndv and fpp
* fix doc
* add unit test
* fix clippy
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* remove underflow logic
* refactor write
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
parquet/Cargo.toml | 9 +-
parquet/src/bin/parquet-show-bloom-filter.rs | 5 +-
parquet/src/bloom_filter/mod.rs | 125 +++++++++++++++++++++-
parquet/src/column/writer/mod.rs | 22 ++++
parquet/src/file/metadata.rs | 2 +-
parquet/src/file/properties.rs | 151 ++++++++++++++++++++++++---
parquet/src/file/reader.rs | 7 +-
parquet/src/file/serialized_reader.rs | 27 +++--
parquet/src/file/writer.rs | 46 +++++++-
parquet/src/lib.rs | 1 -
10 files changed, 353 insertions(+), 42 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 515da585e..7a150c949 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -57,7 +57,8 @@ seq-macro = { version = "0.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
hashbrown = { version = "0.13", default-features = false }
-twox-hash = { version = "1.6", optional = true }
+twox-hash = { version = "1.6", default-features = false }
+paste = { version = "1.0" }
[dev-dependencies]
base64 = { version = "0.13", default-features = false, features = ["std"] }
@@ -77,7 +78,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
all-features = true
[features]
-default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
# Enable CLI tools
@@ -90,8 +91,6 @@ test_common = ["arrow/test_utils"]
experimental = []
# Enable async APIs
async = ["futures", "tokio"]
-# Bloomfilter
-bloom = ["twox-hash"]
[[test]]
name = "arrow_writer_layout"
@@ -115,7 +114,7 @@ required-features = ["arrow", "cli"]
[[bin]]
name = "parquet-show-bloom-filter"
-required-features = ["cli", "bloom"]
+required-features = ["cli"]
[[bench]]
name = "arrow_writer"
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
index a4dbdbe67..28493a94c 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -78,10 +78,7 @@ fn main() {
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
- if let Some(sbbf) = row_group_reader
- .get_column_bloom_filter(column_index)
- .expect("Failed to parse bloom filter")
- {
+ if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
args.values.iter().for_each(|value| {
println!(
"Value {} is {} in bloom filter",
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 4944a93f8..4efba3834 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -24,11 +24,15 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+ SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
+use std::io::Write;
use std::sync::Arc;
-use thrift::protocol::{TCompactInputProtocol, TSerializable};
+use thrift::protocol::{
+ TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
+};
use twox_hash::XxHash64;
/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
@@ -80,6 +84,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
}
/// A split block Bloom filter
+#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);
const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
@@ -113,7 +118,43 @@ fn read_bloom_filter_header_and_length(
))
}
+const BITSET_MIN_LENGTH: usize = 32;
+const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
+
+#[inline]
+fn optimal_num_of_bytes(num_bytes: usize) -> usize {
+ let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
+ let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
+ num_bytes.next_power_of_two()
+}
+
+// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
+// given fpp = (1 - e^(-k * n / m)) ^ k
+// we have m = - k * n / ln(1 - fpp ^ (1 / k))
+// where k = number of hash functions, m = number of bits, n = number of distinct values
+#[inline]
+fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
+ let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
+ num_bits as usize
+}
+
impl Sbbf {
+ /// Create a new [Sbbf] with given number of distinct values and false positive probability.
+ /// Will panic if `fpp` is greater than 1.0 or less than 0.0.
+ pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
+ assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
+ let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
+ Self::new_with_num_of_bytes(num_bits / 8)
+ }
+
+ /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
+ /// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
+ pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
+ let num_bytes = optimal_num_of_bytes(num_bytes);
+ let bitset = vec![0_u8; num_bytes];
+ Self::new(&bitset)
+ }
+
fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
@@ -128,6 +169,45 @@ impl Sbbf {
Self(data)
}
+ /// Write the bloom filter data (header and then bitset) to the output
+ pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+ let mut protocol = TCompactOutputProtocol::new(&mut writer);
+ let header = self.header();
+ header.write_to_out_protocol(&mut protocol).map_err(|e| {
+ ParquetError::General(format!("Could not write bloom filter header: {}", e))
+ })?;
+ protocol.flush()?;
+ self.write_bitset(&mut writer)?;
+ Ok(())
+ }
+
+ /// Write the bitset in serialized form to the writer.
+ fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+ for block in &self.0 {
+ for word in block {
+ writer.write_all(&word.to_le_bytes()).map_err(|e| {
+ ParquetError::General(format!(
+ "Could not write bloom filter bit set: {}",
+ e
+ ))
+ })?;
+ }
+ }
+ Ok(())
+ }
+
+ /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
+ fn header(&self) -> BloomFilterHeader {
+ BloomFilterHeader {
+ // 8 i32 per block, 4 bytes per i32
+ num_bytes: self.0.len() as i32 * 4 * 8,
+ algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
+ hash: BloomFilterHash::XXHASH(XxHash {}),
+ compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
+ }
+ }
+
+ /// Read a new bloom filter from the given offset in the given reader.
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
@@ -292,4 +372,47 @@ mod tests {
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}
+
+ #[test]
+ fn test_optimal_num_of_bytes() {
+ for (input, expected) in &[
+ (0, 32),
+ (9, 32),
+ (31, 32),
+ (32, 32),
+ (33, 64),
+ (99, 128),
+ (1024, 1024),
+ (999_000_000, 128 * 1024 * 1024),
+ ] {
+ assert_eq!(*expected, optimal_num_of_bytes(*input));
+ }
+ }
+
+ #[test]
+ fn test_num_of_bits_from_ndv_fpp() {
+ for (fpp, ndv, num_bits) in &[
+ (0.1, 10, 57),
+ (0.01, 10, 96),
+ (0.001, 10, 146),
+ (0.1, 100, 577),
+ (0.01, 100, 968),
+ (0.001, 100, 1460),
+ (0.1, 1000, 5772),
+ (0.01, 1000, 9681),
+ (0.001, 1000, 14607),
+ (0.1, 10000, 57725),
+ (0.01, 10000, 96815),
+ (0.001, 10000, 146076),
+ (0.1, 100000, 577254),
+ (0.01, 100000, 968152),
+ (0.001, 100000, 1460769),
+ (0.1, 1000000, 5772541),
+ (0.01, 1000000, 9681526),
+ (0.001, 1000000, 14607697),
+ (1e-50, 1_000_000_000_000, 14226231280773240832),
+ ] {
+ assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
+ }
+ }
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3cdf04f54..ae7920e22 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -16,6 +16,8 @@
// under the License.
//! Contains column writer API.
+
+use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
@@ -154,6 +156,8 @@ pub struct ColumnCloseResult {
pub rows_written: u64,
/// Metadata for this column chunk
pub metadata: ColumnChunkMetaData,
+ /// Optional bloom filter for this column
+ pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
/// Optional offset index, identifying page locations
@@ -209,6 +213,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
+ // bloom filter
+ bloom_filter: Option<Sbbf>,
+
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
@@ -231,6 +238,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Used for level information
encodings.insert(Encoding::RLE);
+ let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
+ let bloom_filter = if bloom_filter_enabled {
+ if let Some(ndv) = props.bloom_filter_ndv(descr.path()) {
+ let fpp = props.bloom_filter_fpp(descr.path());
+ Some(Sbbf::new_with_ndv_fpp(ndv, fpp))
+ } else {
+ let max_bytes = props.bloom_filter_max_bytes(descr.path());
+ Some(Sbbf::new_with_num_of_bytes(max_bytes as usize))
+ }
+ } else {
+ None
+ };
+
Self {
descr,
props,
@@ -260,6 +280,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_column_nulls: 0,
column_distinct_count: None,
},
+ bloom_filter,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
@@ -458,6 +479,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
rows_written: self.column_metrics.total_rows_written,
+ bloom_filter: self.bloom_filter,
metadata,
column_index,
offset_index,
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 895776a8a..2ba50fa31 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -236,7 +236,7 @@ pub struct RowGroupMetaData {
}
impl RowGroupMetaData {
- /// Returns builer for row group metadata.
+ /// Returns builder for row group metadata.
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c65ba8035..03117d4cb 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -64,6 +64,7 @@
//! .build();
//! ```
+use paste::paste;
use std::{collections::HashMap, sync::Arc};
use crate::basic::{Compression, Encoding};
@@ -82,6 +83,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
+const DEFAULT_BLOOM_FILTER_ENABLED: bool = false;
+const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024;
+const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01;
/// Parquet writer version.
///
@@ -125,6 +129,26 @@ pub struct WriterProperties {
sorting_columns: Option<Vec<SortingColumn>>,
}
+macro_rules! def_col_property_getter {
+ ($field:ident, $field_type:ty) => {
+ pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> {
+ self.column_properties
+ .get(col)
+ .and_then(|c| c.$field())
+ .or_else(|| self.default_column_properties.$field())
+ }
+ };
+ ($field:ident, $field_type:ty, $default_val:expr) => {
+ pub fn $field(&self, col: &ColumnPath) -> $field_type {
+ self.column_properties
+ .get(col)
+ .and_then(|c| c.$field())
+ .or_else(|| self.default_column_properties.$field())
+ .unwrap_or($default_val)
+ }
+ };
+}
+
impl WriterProperties {
/// Returns builder for writer properties with default values.
pub fn builder() -> WriterPropertiesBuilder {
@@ -255,6 +279,11 @@ impl WriterProperties {
.or_else(|| self.default_column_properties.max_statistics_size())
.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
}
+
+ def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED);
+ def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP);
+ def_col_property_getter!(bloom_filter_ndv, u64);
+ def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES);
}
/// Writer properties builder.
@@ -272,6 +301,52 @@ pub struct WriterPropertiesBuilder {
sorting_columns: Option<Vec<SortingColumn>>,
}
+macro_rules! def_opt_field_setter {
+ ($field: ident, $type: ty) => {
+ paste! {
+ pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
+ self.$field = Some(value);
+ self
+ }
+ }
+ };
+ ($field: ident, $type: ty, $min_value:expr, $max_value:expr) => {
+ paste! {
+ pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
+ if ($min_value..=$max_value).contains(&value) {
+ self.$field = Some(value);
+ } else {
+ self.$field = None
+ }
+ self
+ }
+ }
+ };
+}
+
+macro_rules! def_opt_field_getter {
+ ($field: ident, $type: ty) => {
+ paste! {
+ #[doc = "Returns " $field " if set."]
+ pub fn $field(&self) -> Option<$type> {
+ self.$field
+ }
+ }
+ };
+}
+
+macro_rules! def_per_col_setter {
+ ($field:ident, $field_type:ty) => {
+ paste! {
+ #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."]
+ pub fn [<set_column_ $field>](mut self, col: ColumnPath, value: $field_type) -> Self {
+ self.get_mut_props(col).[<set_ $field>](value);
+ self
+ }
+ }
+ }
+}
+
impl WriterPropertiesBuilder {
/// Returns default state of the builder.
fn with_defaults() -> Self {
@@ -284,7 +359,7 @@ impl WriterPropertiesBuilder {
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
- default_column_properties: ColumnProperties::new(),
+ default_column_properties: Default::default(),
column_properties: HashMap::new(),
sorting_columns: None,
}
@@ -439,7 +514,7 @@ impl WriterPropertiesBuilder {
fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
self.column_properties
.entry(col)
- .or_insert_with(ColumnProperties::new)
+ .or_insert_with(Default::default)
}
/// Sets encoding for a column.
@@ -492,6 +567,11 @@ impl WriterPropertiesBuilder {
self.get_mut_props(col).set_max_statistics_size(value);
self
}
+
+ def_per_col_setter!(bloom_filter_enabled, bool);
+ def_per_col_setter!(bloom_filter_fpp, f64);
+ def_per_col_setter!(bloom_filter_max_bytes, u32);
+ def_per_col_setter!(bloom_filter_ndv, u64);
}
/// Controls the level of statistics to be computed by the writer
@@ -515,27 +595,24 @@ impl Default for EnabledStatistics {
///
/// If a field is `None`, it means that no specific value has been set for this column,
/// so some subsequent or default value must be used.
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, Default, PartialEq)]
struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<EnabledStatistics>,
max_statistics_size: Option<usize>,
+ /// bloom filter enabled
+ bloom_filter_enabled: Option<bool>,
+ /// bloom filter expected number of distinct values
+ bloom_filter_ndv: Option<u64>,
+ /// bloom filter false positive probability
+ bloom_filter_fpp: Option<f64>,
+ /// bloom filter max number of bytes
+ bloom_filter_max_bytes: Option<u32>,
}
impl ColumnProperties {
- /// Initialise column properties with default values.
- fn new() -> Self {
- Self {
- encoding: None,
- codec: None,
- dictionary_enabled: None,
- statistics_enabled: None,
- max_statistics_size: None,
- }
- }
-
/// Sets encoding for this column.
///
/// If dictionary is not enabled, this is treated as a primary encoding for a column.
@@ -572,6 +649,11 @@ impl ColumnProperties {
self.max_statistics_size = Some(value);
}
+ def_opt_field_setter!(bloom_filter_enabled, bool);
+ def_opt_field_setter!(bloom_filter_fpp, f64, 0.0, 1.0);
+ def_opt_field_setter!(bloom_filter_max_bytes, u32);
+ def_opt_field_setter!(bloom_filter_ndv, u64);
+
/// Returns optional encoding for this column.
fn encoding(&self) -> Option<Encoding> {
self.encoding
@@ -599,17 +681,25 @@ impl ColumnProperties {
fn max_statistics_size(&self) -> Option<usize> {
self.max_statistics_size
}
+
+ def_opt_field_getter!(bloom_filter_enabled, bool);
+ def_opt_field_getter!(bloom_filter_fpp, f64);
+ def_opt_field_getter!(bloom_filter_max_bytes, u32);
+ def_opt_field_getter!(bloom_filter_ndv, u64);
}
/// Reference counted reader properties.
pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
+const DEFAULT_READ_BLOOM_FILTER: bool = false;
+
/// Reader properties.
///
/// All properties are immutable and `Send` + `Sync`.
/// Use [`ReaderPropertiesBuilder`] to assemble these properties.
pub struct ReaderProperties {
codec_options: CodecOptions,
+ read_bloom_filter: bool,
}
impl ReaderProperties {
@@ -622,11 +712,17 @@ impl ReaderProperties {
pub(crate) fn codec_options(&self) -> &CodecOptions {
&self.codec_options
}
+
+ /// Returns whether to read bloom filter
+ pub(crate) fn read_bloom_filter(&self) -> bool {
+ self.read_bloom_filter
+ }
}
/// Reader properties builder.
pub struct ReaderPropertiesBuilder {
codec_options_builder: CodecOptionsBuilder,
+ read_bloom_filter: Option<bool>,
}
/// Reader properties builder.
@@ -635,6 +731,7 @@ impl ReaderPropertiesBuilder {
fn with_defaults() -> Self {
Self {
codec_options_builder: CodecOptionsBuilder::default(),
+ read_bloom_filter: None,
}
}
@@ -642,6 +739,9 @@ impl ReaderPropertiesBuilder {
pub fn build(self) -> ReaderProperties {
ReaderProperties {
codec_options: self.codec_options_builder.build(),
+ read_bloom_filter: self
+ .read_bloom_filter
+ .unwrap_or(DEFAULT_READ_BLOOM_FILTER),
}
}
@@ -659,6 +759,17 @@ impl ReaderPropertiesBuilder {
.set_backward_compatible_lz4(value);
self
}
+
+ /// Enable/disable reading bloom filter
+ ///
+ /// If reading bloom filter is enabled, bloom filter will be read from the file.
+ /// If reading bloom filter is disabled, bloom filter will not be read from the file.
+ ///
+ /// By default bloom filter is set to be read.
+ pub fn set_read_bloom_filter(mut self, value: bool) -> Self {
+ self.read_bloom_filter = Some(value);
+ self
+ }
}
#[cfg(test)]
@@ -701,6 +812,13 @@ mod tests {
props.max_statistics_size(&ColumnPath::from("col")),
DEFAULT_MAX_STATISTICS_SIZE
);
+ assert!(!props.bloom_filter_enabled(&ColumnPath::from("col")));
+ assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01);
+ assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None);
+ assert_eq!(
+ props.bloom_filter_max_bytes(&ColumnPath::from("col")),
+ 1024 * 1024
+ );
}
#[test]
@@ -784,6 +902,10 @@ mod tests {
EnabledStatistics::Chunk,
)
.set_column_max_statistics_size(ColumnPath::from("col"), 123)
+ .set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
+ .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100)
+ .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
+ .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000)
.build();
assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
@@ -858,6 +980,7 @@ mod tests {
.build();
assert_eq!(props.codec_options(), &codec_options);
+ assert!(!props.read_bloom_filter());
}
#[test]
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 325944c21..bb82f2299 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -21,7 +21,6 @@
use bytes::Bytes;
use std::{boxed::Box, io::Read, sync::Arc};
-#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::column::page::PageIterator;
use crate::column::{page::PageReader, reader::ColumnReader};
@@ -145,9 +144,9 @@ pub trait RowGroupReader: Send + Sync {
Ok(col_reader)
}
- #[cfg(feature = "bloom")]
- /// Get bloom filter for the `i`th column chunk, if present.
- fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;
+ /// Get bloom filter for the `i`th column chunk, if present and the reader was configured
+ /// to read bloom filters.
+ fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
/// Get iterator of `Row`s from this row group.
///
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index cb39dd194..84768aa23 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -20,10 +20,10 @@
use std::collections::VecDeque;
use std::io::Cursor;
+use std::iter;
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
use crate::basic::{Encoding, Type};
-#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
@@ -329,7 +329,7 @@ impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
f,
row_group_metadata,
props,
- )))
+ )?))
}
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
@@ -342,6 +342,7 @@ pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
props: ReaderPropertiesPtr,
+ bloom_filters: Vec<Option<Sbbf>>,
}
impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
@@ -350,12 +351,22 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
props: ReaderPropertiesPtr,
- ) -> Self {
- Self {
+ ) -> Result<Self> {
+ let bloom_filters = if props.read_bloom_filter() {
+ metadata
+ .columns()
+ .iter()
+ .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
+ .collect::<Result<Vec<_>>>()?
+ } else {
+ iter::repeat(None).take(metadata.columns().len()).collect()
+ };
+ Ok(Self {
chunk_reader,
metadata,
props,
- }
+ bloom_filters,
+ })
}
}
@@ -388,11 +399,9 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
)?))
}
- #[cfg(feature = "bloom")]
/// get bloom filter for the `i`th column
- fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>> {
- let col = self.metadata.column(i);
- Sbbf::read_from_column_chunk(col, self.chunk_reader.clone())
+ fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
+ self.bloom_filters[i].as_ref()
}
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2fe0b26e7..3f1731687 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,10 +18,10 @@
//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.
-use std::{io::Write, sync::Arc};
-
+use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
use crate::basic::PageType;
@@ -92,6 +92,7 @@ pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()
pub type OnCloseRowGroup<'a> = Box<
dyn FnOnce(
RowGroupMetaDataPtr,
+ Vec<Option<Sbbf>>,
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
) -> Result<()>
@@ -116,6 +117,7 @@ pub struct SerializedFileWriter<W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
+ bloom_filters: Vec<Vec<Option<Sbbf>>>,
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
row_group_index: usize,
@@ -132,6 +134,7 @@ impl<W: Write> SerializedFileWriter<W> {
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
row_groups: vec![],
+ bloom_filters: vec![],
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
@@ -149,10 +152,15 @@ impl<W: Write> SerializedFileWriter<W> {
self.row_group_index += 1;
let row_groups = &mut self.row_groups;
+ let row_bloom_filters = &mut self.bloom_filters;
let row_column_indexes = &mut self.column_indexes;
let row_offset_indexes = &mut self.offset_indexes;
- let on_close = |metadata, row_group_column_index, row_group_offset_index| {
+ let on_close = |metadata,
+ row_group_bloom_filter,
+ row_group_column_index,
+ row_group_offset_index| {
row_groups.push(metadata);
+ row_bloom_filters.push(row_group_bloom_filter);
row_column_indexes.push(row_group_column_index);
row_offset_indexes.push(row_group_offset_index);
Ok(())
@@ -212,6 +220,31 @@ impl<W: Write> SerializedFileWriter<W> {
Ok(())
}
+ /// Serialize all the bloom filter to the file
+ fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
+ // iter row group
+ // iter each column
+ // write bloom filter to the file
+ for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+ for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
+ match &self.bloom_filters[row_group_idx][column_idx] {
+ Some(bloom_filter) => {
+ let start_offset = self.buf.bytes_written();
+ bloom_filter.write(&mut self.buf)?;
+ // set offset and index for bloom filter
+ column_chunk
+ .meta_data
+ .as_mut()
+ .expect("can't have bloom filter without column metadata")
+ .bloom_filter_offset = Some(start_offset as i64);
+ }
+ None => {}
+ }
+ }
+ }
+ Ok(())
+ }
+
/// Serialize all the column index to the file
fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
@@ -250,6 +283,7 @@ impl<W: Write> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();
+ self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
@@ -320,6 +354,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
column_index: usize,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
+ bloom_filters: Vec<Option<Sbbf>>,
column_indexes: Vec<Option<ColumnIndex>>,
offset_indexes: Vec<Option<OffsetIndex>>,
on_close: Option<OnCloseRowGroup<'a>>,
@@ -348,6 +383,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
column_index: 0,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
+ bloom_filters: Vec::with_capacity(num_columns),
column_indexes: Vec::with_capacity(num_columns),
offset_indexes: Vec::with_capacity(num_columns),
total_bytes_written: 0,
@@ -380,11 +416,13 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
let column_chunks = &mut self.column_chunks;
let column_indexes = &mut self.column_indexes;
let offset_indexes = &mut self.offset_indexes;
+ let bloom_filters = &mut self.bloom_filters;
let on_close = |r: ColumnCloseResult| {
// Update row group writer metrics
*total_bytes_written += r.bytes_written;
column_chunks.push(r.metadata);
+ bloom_filters.push(r.bloom_filter);
column_indexes.push(r.column_index);
offset_indexes.push(r.offset_index);
@@ -443,6 +481,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
if let Some(on_close) = self.on_close.take() {
on_close(
metadata,
+ self.bloom_filters.clone(),
self.column_indexes.clone(),
self.offset_indexes.clone(),
)?
@@ -623,6 +662,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
Ok(spec)
}
+
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index cd29d02f8..4cdba1dc5 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -84,7 +84,6 @@ pub mod arrow;
pub mod column;
experimental!(mod compression);
experimental!(mod encodings);
-#[cfg(feature = "bloom")]
pub mod bloom_filter;
pub mod file;
pub mod record;