You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/24 10:16:24 UTC

[arrow-rs] branch master updated: bloom filter part IV: adjust writer properties, bloom filter properties, and incorporate into column encoder (#3165)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5640a5b0c bloom filter part IV: adjust writer properties, bloom filter properties, and incorporate into column encoder (#3165)
5640a5b0c is described below

commit 5640a5b0c7d456a68c7c0cc562425ccf5494ecec
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Thu Nov 24 18:16:18 2022 +0800

    bloom filter part IV: adjust writer properties, bloom filter properties, and incorporate into column encoder (#3165)
    
    * rework bloom filter
    
    1. update number of properties
    2. push down hashing to encoder level
    3. add more docs
    
    * move bloom filter
    
    * update prompt
    
    * remove unused updates
---
 parquet/src/arrow/array_reader/mod.rs        |   2 +-
 parquet/src/arrow/arrow_writer/byte_array.rs |   6 +
 parquet/src/bin/parquet-fromcsv-help.txt     |   9 +-
 parquet/src/bin/parquet-fromcsv.rs           |  22 +-
 parquet/src/bin/parquet-show-bloom-filter.rs |   2 +-
 parquet/src/bloom_filter/mod.rs              |  43 ++--
 parquet/src/column/writer/encoder.rs         |  32 +++
 parquet/src/column/writer/mod.rs             |  22 +-
 parquet/src/file/properties.rs               | 297 +++++++++++++++++----------
 9 files changed, 284 insertions(+), 151 deletions(-)

diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index aede5e86c..f46f6073a 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -102,7 +102,7 @@ pub trait RowGroupCollection {
     /// Get schema of parquet file.
     fn schema(&self) -> SchemaDescPtr;
 
-    /// Get the numer of rows in this collection
+    /// Get the number of rows in this collection
     fn num_rows(&self) -> usize;
 
     /// Returns an iterator over the column chunks for particular column
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs
index d52317852..d870ac54f 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -17,6 +17,7 @@
 
 use crate::arrow::arrow_writer::levels::LevelInfo;
 use crate::basic::Encoding;
+use crate::bloom_filter::Sbbf;
 use crate::column::page::PageWriter;
 use crate::column::writer::encoder::{
     ColumnValueEncoder, DataPageValues, DictionaryPage,
@@ -451,6 +452,11 @@ impl ColumnValueEncoder for ByteArrayEncoder {
         }
     }
 
+    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
+        // TODO FIX ME need to handle bloom filter in arrow writer
+        None
+    }
+
     fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
     where
         Self: Sized,
diff --git a/parquet/src/bin/parquet-fromcsv-help.txt b/parquet/src/bin/parquet-fromcsv-help.txt
index f599a13f0..ec7eb0cc1 100644
--- a/parquet/src/bin/parquet-fromcsv-help.txt
+++ b/parquet/src/bin/parquet-fromcsv-help.txt
@@ -37,10 +37,10 @@ Options:
           [possible values: lf, crlf, cr]
 
   -e, --escape-char <ESCAPE_CHAR>
-          escape charactor
+          escape character
 
   -q, --quote-char <QUOTE_CHAR>
-          quate charactor
+          quote character
 
   -D, --double-quote <DOUBLE_QUOTE>
           double quote
@@ -58,6 +58,11 @@ Options:
   -m, --max-row-group-size <MAX_ROW_GROUP_SIZE>
           max row group size
 
+      --enable-bloom-filter <ENABLE_BLOOM_FILTER>
+          whether to enable bloom filter writing
+          
+          [possible values: true, false]
+
       --help
           display usage help
 
diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs
index 5fdece7cc..b11f3406c 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -57,11 +57,11 @@
 //!
 //! - `-i`, `--input-file` : Path to input CSV file
 //! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
-//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format`
-//! - `-e`, `--escape` : Escape charactor for input file
+//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
+//! - `-e`, `--escape` : Escape character for input file
 //! - `-h`, `--has-header` : Input has header
-//! - `-r`, `--record-terminator` : Record terminator charactor for input. default is CRLF
-//! - `-q`, `--quote-char` : Input quoting charactor
+//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF
+//! - `-q`, `--quote-char` : Input quoting character
 //!
 
 use std::{
@@ -182,9 +182,9 @@ struct Args {
     delimiter: Option<char>,
     #[clap(value_enum, short, long, help("record terminator"))]
     record_terminator: Option<RecordTerminator>,
-    #[clap(short, long, help("escape charactor"))]
+    #[clap(short, long, help("escape character"))]
     escape_char: Option<char>,
-    #[clap(short, long, help("quate charactor"))]
+    #[clap(short, long, help("quote character"))]
     quote_char: Option<char>,
     #[clap(short('D'), long, help("double quote"))]
     double_quote: Option<bool>,
@@ -197,6 +197,8 @@ struct Args {
     writer_version: Option<WriterVersion>,
     #[clap(short, long, help("max row group size"))]
     max_row_group_size: Option<usize>,
+    #[clap(long, help("whether to enable bloom filter writing"))]
+    enable_bloom_filter: Option<bool>,
 
     #[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
     help: Option<bool>,
@@ -290,6 +292,10 @@ fn configure_writer_properties(args: &Args) -> WriterProperties {
         properties_builder =
             properties_builder.set_max_row_group_size(max_row_group_size);
     }
+    if let Some(enable_bloom_filter) = args.enable_bloom_filter {
+        properties_builder =
+            properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
+    }
     properties_builder.build()
 }
 
@@ -548,6 +554,7 @@ mod tests {
             parquet_compression: Compression::SNAPPY,
             writer_version: None,
             max_row_group_size: None,
+            enable_bloom_filter: None,
             help: None,
         };
         let arrow_schema = Arc::new(Schema::new(vec![
@@ -582,6 +589,7 @@ mod tests {
             parquet_compression: Compression::SNAPPY,
             writer_version: None,
             max_row_group_size: None,
+            enable_bloom_filter: None,
             help: None,
         };
         let arrow_schema = Arc::new(Schema::new(vec![
@@ -636,6 +644,8 @@ mod tests {
             parquet_compression: Compression::SNAPPY,
             writer_version: None,
             max_row_group_size: None,
+            // by default we shall test bloom filter writing
+            enable_bloom_filter: Some(true),
             help: None,
         };
         convert_csv_to_parquet(&args).unwrap();
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
index 28493a94c..55ecb2abf 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -83,7 +83,7 @@ fn main() {
                     println!(
                         "Value {} is {} in bloom filter",
                         value,
-                        if sbbf.check(value.as_str()) {
+                        if sbbf.check(&value.as_str()) {
                             "present"
                         } else {
                             "absent"
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 4efba3834..15c38cf59 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 //! Bloom filter implementation specific to Parquet, as described
-//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
+//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md).
 
 use crate::data_type::AsBytes;
 use crate::errors::ParquetError;
@@ -35,7 +35,7 @@ use thrift::protocol::{
 };
 use twox_hash::XxHash64;
 
-/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
+/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
 const SALT: [u32; 8] = [
     0x47b6137b_u32,
     0x44974d91_u32,
@@ -83,7 +83,9 @@ fn block_check(block: &Block, hash: u32) -> bool {
     true
 }
 
-/// A split block Bloom filter
+/// A split block Bloom filter. The creation of this structure is based on the
+/// [`crate::file::properties::BloomFilterProperties`] struct set via [`crate::file::properties::WriterProperties`] and
+/// is thus hidden by default.
 #[derive(Debug, Clone)]
 pub struct Sbbf(Vec<Block>);
 
@@ -118,8 +120,8 @@ fn read_bloom_filter_header_and_length(
     ))
 }
 
-const BITSET_MIN_LENGTH: usize = 32;
-const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
+pub(crate) const BITSET_MIN_LENGTH: usize = 32;
+pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
 
 #[inline]
 fn optimal_num_of_bytes(num_bytes: usize) -> usize {
@@ -141,15 +143,20 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
 impl Sbbf {
     /// Create a new [Sbbf] with given number of distinct values and false positive probability.
     /// Will panic if `fpp` is greater than 1.0 or less than 0.0.
-    pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
-        assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
+    pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
+        if !(0.0..1.0).contains(&fpp) {
+            return Err(ParquetError::General(format!(
+                "False positive probability must be between 0.0 and 1.0, got {}",
+                fpp
+            )));
+        }
         let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
-        Self::new_with_num_of_bytes(num_bits / 8)
+        Ok(Self::new_with_num_of_bytes(num_bits / 8))
     }
 
     /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
-    /// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
-    pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
+    /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
+    pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
         let num_bytes = optimal_num_of_bytes(num_bytes);
         let bitset = vec![0_u8; num_bytes];
         Self::new(&bitset)
@@ -170,7 +177,7 @@ impl Sbbf {
     }
 
     /// Write the bloom filter data (header and then bitset) to the output
-    pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
         let mut protocol = TCompactOutputProtocol::new(&mut writer);
         let header = self.header();
         header.write_to_out_protocol(&mut protocol).map_err(|e| {
@@ -208,7 +215,7 @@ impl Sbbf {
     }
 
     /// Read a new bloom filter from the given offset in the given reader.
-    pub fn read_from_column_chunk<R: ChunkReader>(
+    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
         column_metadata: &ColumnChunkMetaData,
         reader: Arc<R>,
     ) -> Result<Option<Self>, ParquetError> {
@@ -254,7 +261,7 @@ impl Sbbf {
     }
 
     /// Insert an [AsBytes] value into the filter
-    pub fn insert<T: AsBytes>(&mut self, value: T) {
+    pub fn insert<T: AsBytes>(&mut self, value: &T) {
         self.insert_hash(hash_as_bytes(value));
     }
 
@@ -266,7 +273,7 @@ impl Sbbf {
     }
 
     /// Check if an [AsBytes] value is probably present or definitely absent in the filter
-    pub fn check<T: AsBytes>(&self, value: T) -> bool {
+    pub fn check<T: AsBytes>(&self, value: &T) -> bool {
         self.check_hash(hash_as_bytes(value))
     }
 
@@ -284,7 +291,7 @@ impl Sbbf {
 const SEED: u64 = 0;
 
 #[inline]
-fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
+fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
     let mut hasher = XxHash64::with_seed(SEED);
     hasher.write(value.as_bytes());
     hasher.finish()
@@ -324,8 +331,8 @@ mod tests {
     fn test_sbbf_insert_and_check() {
         let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]);
         for i in 0..1_000_000 {
-            sbbf.insert(i);
-            assert!(sbbf.check(i));
+            sbbf.insert(&i);
+            assert!(sbbf.check(&i));
         }
     }
 
@@ -339,7 +346,7 @@ mod tests {
         let sbbf = Sbbf::new(bitset);
         for a in 0..10i64 {
             let value = format!("a{}", a);
-            assert!(sbbf.check(value.as_str()));
+            assert!(sbbf.check(&value.as_str()));
         }
     }
 
diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs
index 22cc71f6c..0d0716d7a 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::basic::Encoding;
+use crate::bloom_filter::Sbbf;
 use crate::column::writer::{
     compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max,
     update_min,
@@ -24,6 +25,7 @@ use crate::data_type::private::ParquetValueType;
 use crate::data_type::DataType;
 use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
 use crate::errors::{ParquetError, Result};
+use crate::file::properties::BloomFilterProperties;
 use crate::file::properties::{EnabledStatistics, WriterProperties};
 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
 use crate::util::memory::ByteBufferPtr;
@@ -115,6 +117,11 @@ pub trait ColumnValueEncoder {
 
     /// Flush the next data page for this column chunk
     fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
+
+    /// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes
+    /// will *not* be tracked by the bloom filter as it is empty since. This should be called once
+    /// near the end of encoding.
+    fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
 }
 
 pub struct ColumnValueEncoderImpl<T: DataType> {
@@ -125,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
     statistics_enabled: EnabledStatistics,
     min_value: Option<T::T>,
     max_value: Option<T::T>,
+    bloom_filter: Option<Sbbf>,
 }
 
 impl<T: DataType> ColumnValueEncoderImpl<T> {
@@ -136,6 +144,13 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
             }
         }
 
+        // encode the values into bloom filter if enabled
+        if let Some(bloom_filter) = &mut self.bloom_filter {
+            for value in slice {
+                bloom_filter.insert(value);
+            }
+        }
+
         match &mut self.dict_encoder {
             Some(encoder) => encoder.put(slice),
             _ => self.encoder.put(slice),
@@ -161,6 +176,10 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
         }
     }
 
+    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
+        self.bloom_filter.take()
+    }
+
     fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
         let dict_supported = props.dictionary_enabled(descr.path())
             && has_dictionary_support(T::get_physical_type(), props);
@@ -175,12 +194,25 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
 
         let statistics_enabled = props.statistics_enabled(descr.path());
 
+        let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
+        let bloom_filter =
+            if let Some(BloomFilterProperties { ndv, fpp }) = bloom_filter_enabled {
+                Sbbf::new_with_ndv_fpp(ndv, fpp)
+                    .map_err(|e| {
+                        eprintln!("invalid bloom filter properties: {}", e);
+                    })
+                    .ok()
+            } else {
+                None
+            };
+
         Ok(Self {
             encoder,
             dict_encoder,
             descr: descr.clone(),
             num_values: 0,
             statistics_enabled,
+            bloom_filter,
             min_value: None,
             max_value: None,
         })
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index ae7920e22..40f8c9940 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -212,10 +212,6 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     def_levels_sink: Vec<i16>,
     rep_levels_sink: Vec<i16>,
     data_pages: VecDeque<CompressedPage>,
-
-    // bloom filter
-    bloom_filter: Option<Sbbf>,
-
     // column index and offset index
     column_index_builder: ColumnIndexBuilder,
     offset_index_builder: OffsetIndexBuilder,
@@ -238,19 +234,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         // Used for level information
         encodings.insert(Encoding::RLE);
 
-        let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
-        let bloom_filter = if bloom_filter_enabled {
-            if let Some(ndv) = props.bloom_filter_ndv(descr.path()) {
-                let fpp = props.bloom_filter_fpp(descr.path());
-                Some(Sbbf::new_with_ndv_fpp(ndv, fpp))
-            } else {
-                let max_bytes = props.bloom_filter_max_bytes(descr.path());
-                Some(Sbbf::new_with_num_of_bytes(max_bytes as usize))
-            }
-        } else {
-            None
-        };
-
         Self {
             descr,
             props,
@@ -280,7 +263,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 num_column_nulls: 0,
                 column_distinct_count: None,
             },
-            bloom_filter,
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -454,7 +436,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         &self.descr
     }
 
-    /// Finalises writes and closes the column writer.
+    /// Finalizes writes and closes the column writer.
     /// Returns total bytes written, total rows written and column chunk metadata.
     pub fn close(mut self) -> Result<ColumnCloseResult> {
         if self.page_metrics.num_buffered_values > 0 {
@@ -479,7 +461,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         Ok(ColumnCloseResult {
             bytes_written: self.column_metrics.total_bytes_written,
             rows_written: self.column_metrics.total_rows_written,
-            bloom_filter: self.bloom_filter,
+            bloom_filter: self.encoder.flush_bloom_filter(),
             metadata,
             column_index,
             offset_index,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 03117d4cb..6d30be2e4 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -64,7 +64,6 @@
 //!     .build();
 //! ```
 
-use paste::paste;
 use std::{collections::HashMap, sync::Arc};
 
 use crate::basic::{Compression, Encoding};
@@ -83,9 +82,10 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
 const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
 const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
 const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
-const DEFAULT_BLOOM_FILTER_ENABLED: bool = false;
-const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024;
-const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01;
+/// default value for the false positive probability used in a bloom filter.
+pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
+/// default value for the expected number of distinct values used in a bloom filter.
+pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
 
 /// Parquet writer version.
 ///
@@ -129,26 +129,6 @@ pub struct WriterProperties {
     sorting_columns: Option<Vec<SortingColumn>>,
 }
 
-macro_rules! def_col_property_getter {
-    ($field:ident, $field_type:ty) => {
-        pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> {
-            self.column_properties
-                .get(col)
-                .and_then(|c| c.$field())
-                .or_else(|| self.default_column_properties.$field())
-        }
-    };
-    ($field:ident, $field_type:ty, $default_val:expr) => {
-        pub fn $field(&self, col: &ColumnPath) -> $field_type {
-            self.column_properties
-                .get(col)
-                .and_then(|c| c.$field())
-                .or_else(|| self.default_column_properties.$field())
-                .unwrap_or($default_val)
-        }
-    };
-}
-
 impl WriterProperties {
     /// Returns builder for writer properties with default values.
     pub fn builder() -> WriterPropertiesBuilder {
@@ -280,10 +260,17 @@ impl WriterProperties {
             .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
     }
 
-    def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED);
-    def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP);
-    def_col_property_getter!(bloom_filter_ndv, u64);
-    def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES);
+    /// Returns whether bloom filter is enabled for a given column. Bloom filter can be enabled over
+    /// all or for a specific column, and is by default set to be disabled.
+    pub fn bloom_filter_enabled(
+        &self,
+        col: &ColumnPath,
+    ) -> Option<BloomFilterProperties> {
+        self.column_properties
+            .get(col)
+            .and_then(|c| c.bloom_filter_enabled())
+            .or_else(|| self.default_column_properties.bloom_filter_enabled())
+    }
 }
 
 /// Writer properties builder.
@@ -301,52 +288,6 @@ pub struct WriterPropertiesBuilder {
     sorting_columns: Option<Vec<SortingColumn>>,
 }
 
-macro_rules! def_opt_field_setter {
-    ($field: ident, $type: ty) => {
-        paste! {
-            pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
-                self.$field = Some(value);
-                self
-            }
-        }
-    };
-    ($field: ident, $type: ty, $min_value:expr, $max_value:expr) => {
-        paste! {
-            pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
-                if ($min_value..=$max_value).contains(&value) {
-                    self.$field = Some(value);
-                } else {
-                    self.$field = None
-                }
-                self
-            }
-        }
-    };
-}
-
-macro_rules! def_opt_field_getter {
-    ($field: ident, $type: ty) => {
-        paste! {
-            #[doc = "Returns " $field " if set."]
-            pub fn $field(&self) -> Option<$type> {
-                self.$field
-            }
-        }
-    };
-}
-
-macro_rules! def_per_col_setter {
-    ($field:ident, $field_type:ty) => {
-        paste! {
-            #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."]
-            pub fn [<set_column_ $field>](mut self, col: ColumnPath, value: $field_type) -> Self {
-                self.get_mut_props(col).[<set_ $field>](value);
-                self
-            }
-        }
-    }
-}
-
 impl WriterPropertiesBuilder {
     /// Returns default state of the builder.
     fn with_defaults() -> Self {
@@ -506,6 +447,30 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    /// Sets whether bloom filter is enabled for any column.
+    /// If the bloom filter is enabled previously then it is a no-op.
+    /// If the bloom filter is not yet enabled, a default set of ndv and fpp value will be used.
+    /// You can use [`set_bloom_filter_ndv`](Self::set_bloom_filter_ndv) and [`set_bloom_filter_fpp`](Self::set_bloom_filter_fpp) to further adjust the ndv and fpp.
+    pub fn set_bloom_filter_enabled(mut self, value: bool) -> Self {
+        self.default_column_properties
+            .set_bloom_filter_enabled(value);
+        self
+    }
+
+    /// Sets bloom filter false positive probability (fpp) for any column.
+    /// Implicitly [`set_bloom_filter_enabled`](Self::set_bloom_filter_enabled).
+    pub fn set_bloom_filter_fpp(mut self, value: f64) -> Self {
+        self.default_column_properties.set_bloom_filter_fpp(value);
+        self
+    }
+
+    /// Sets number of distinct values (ndv) for bloom filter for any column.
+    /// Implicitly [`set_bloom_filter_enabled`](Self::set_bloom_filter_enabled).
+    pub fn set_bloom_filter_ndv(mut self, value: u64) -> Self {
+        self.default_column_properties.set_bloom_filter_ndv(value);
+        self
+    }
+
     // ----------------------------------------------------------------------
     // Setters for a specific column
 
@@ -568,10 +533,33 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    def_per_col_setter!(bloom_filter_enabled, bool);
-    def_per_col_setter!(bloom_filter_fpp, f64);
-    def_per_col_setter!(bloom_filter_max_bytes, u32);
-    def_per_col_setter!(bloom_filter_ndv, u64);
+    /// Sets whether a bloom filter should be created for a specific column.
+    /// The behavior is similar to [`set_bloom_filter_enabled`](Self::set_bloom_filter_enabled).
+    /// Takes precedence over globally defined settings.
+    pub fn set_column_bloom_filter_enabled(
+        mut self,
+        col: ColumnPath,
+        value: bool,
+    ) -> Self {
+        self.get_mut_props(col).set_bloom_filter_enabled(value);
+        self
+    }
+
+    /// Sets the false positive probability for bloom filter for a specific column.
+    /// The behavior is similar to [`set_bloom_filter_fpp`](Self::set_bloom_filter_fpp) but will
+    /// override the default.
+    pub fn set_column_bloom_filter_fpp(mut self, col: ColumnPath, value: f64) -> Self {
+        self.get_mut_props(col).set_bloom_filter_fpp(value);
+        self
+    }
+
+    /// Sets the number of distinct values for bloom filter for a specific column.
+    /// The behavior is similar to [`set_bloom_filter_ndv`](Self::set_bloom_filter_ndv) but will
+    /// override the default.
+    pub fn set_column_bloom_filter_ndv(mut self, col: ColumnPath, value: u64) -> Self {
+        self.get_mut_props(col).set_bloom_filter_ndv(value);
+        self
+    }
 }
 
 /// Controls the level of statistics to be computed by the writer
@@ -591,6 +579,43 @@ impl Default for EnabledStatistics {
     }
 }
 
+/// Controls the bloom filter to be computed by the writer.
+#[derive(Debug, Clone, PartialEq)]
+pub struct BloomFilterProperties {
+    /// False positive probability, should be always between 0 and 1 exclusive. Defaults to [`DEFAULT_BLOOM_FILTER_FPP`].
+    ///
+    /// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_fpp`].
+    ///
+    /// The bloom filter data structure is a trade of between disk and memory space versus fpp, the
+    /// smaller the fpp, the more memory and disk space is required, thus setting it to a reasonable value
+    /// e.g. 0.1, 0.05, or 0.001 is recommended.
+    ///
+    /// Setting to very small number diminishes the value of the filter itself, as the bitset size is
+    /// even larger than just storing the whole value. You are also expected to set `ndv` if it can
+    /// be known in advance in order to largely reduce space usage.
+    pub fpp: f64,
+    /// Number of distinct values, should be non-negative to be meaningful. Defaults to [`DEFAULT_BLOOM_FILTER_NDV`].
+    ///
+    /// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_ndv`].
+    ///
+    /// Usage of bloom filter is most beneficial for columns with large cardinality, so a good heuristic
+    /// is to set ndv to number of rows. However it can reduce disk size if you know in advance a smaller
+    /// number of distinct values. For very small ndv value it is probably not worth it to use bloom filter
+    /// anyway.
+    ///
+    /// Increasing this value (without increasing fpp) will result in an increase in disk or memory size.
+    pub ndv: u64,
+}
+
+impl Default for BloomFilterProperties {
+    fn default() -> Self {
+        BloomFilterProperties {
+            fpp: DEFAULT_BLOOM_FILTER_FPP,
+            ndv: DEFAULT_BLOOM_FILTER_NDV,
+        }
+    }
+}
+
 /// Container for column properties that can be changed as part of writer.
 ///
 /// If a field is `None`, it means that no specific value has been set for this column,
@@ -602,14 +627,8 @@ struct ColumnProperties {
     dictionary_enabled: Option<bool>,
     statistics_enabled: Option<EnabledStatistics>,
     max_statistics_size: Option<usize>,
-    /// bloom filter enabled
-    bloom_filter_enabled: Option<bool>,
-    /// bloom filter expected number of distinct values
-    bloom_filter_ndv: Option<u64>,
-    /// bloom filter false positive probability
-    bloom_filter_fpp: Option<f64>,
-    /// bloom filter max number of bytes
-    bloom_filter_max_bytes: Option<u32>,
+    /// bloom filter related properties
+    bloom_filter_enabled: Option<BloomFilterProperties>,
 }
 
 impl ColumnProperties {
@@ -649,10 +668,45 @@ impl ColumnProperties {
         self.max_statistics_size = Some(value);
     }
 
-    def_opt_field_setter!(bloom_filter_enabled, bool);
-    def_opt_field_setter!(bloom_filter_fpp, f64, 0.0, 1.0);
-    def_opt_field_setter!(bloom_filter_max_bytes, u32);
-    def_opt_field_setter!(bloom_filter_ndv, u64);
+    /// If `value` is `true`, sets bloom filter properties to default values if not previously set,
+    /// otherwise it is a no-op.
+    /// If `value` is `false`, resets bloom filter properties to `None`.
+    fn set_bloom_filter_enabled(&mut self, value: bool) {
+        if value {
+            self.bloom_filter_enabled = self
+                .bloom_filter_enabled()
+                .or_else(|| Some(Default::default()));
+        } else {
+            self.bloom_filter_enabled = None;
+        }
+    }
+
+    /// Sets the false positive probability for bloom filter for this column, and implicitly enables
+    /// bloom filter if not previously enabled. If the `value` is not between 0 and 1 exclusive, it is
+    /// discarded as no-op.
+    fn set_bloom_filter_fpp(&mut self, value: f64) {
+        if (0.0..1.0).contains(&value) {
+            self.bloom_filter_enabled = self
+                .bloom_filter_enabled()
+                .or_else(|| Some(Default::default()))
+                .map(|BloomFilterProperties { ndv, .. }| BloomFilterProperties {
+                    ndv,
+                    fpp: value,
+                });
+        }
+    }
+
+    /// Sets the number of distinct (unique) values for bloom filter for this column, and implicitly
+    /// enables bloom filter if not previously enabled.
+    fn set_bloom_filter_ndv(&mut self, value: u64) {
+        self.bloom_filter_enabled = self
+            .bloom_filter_enabled()
+            .or_else(|| Some(Default::default()))
+            .map(|BloomFilterProperties { fpp, .. }| BloomFilterProperties {
+                ndv: value,
+                fpp,
+            });
+    }
 
     /// Returns optional encoding for this column.
     fn encoding(&self) -> Option<Encoding> {
@@ -682,10 +736,10 @@ impl ColumnProperties {
         self.max_statistics_size
     }
 
-    def_opt_field_getter!(bloom_filter_enabled, bool);
-    def_opt_field_getter!(bloom_filter_fpp, f64);
-    def_opt_field_getter!(bloom_filter_max_bytes, u32);
-    def_opt_field_getter!(bloom_filter_ndv, u64);
+    /// Returns bloom filter properties if set.
+    fn bloom_filter_enabled(&self) -> Option<BloomFilterProperties> {
+        self.bloom_filter_enabled.clone()
+    }
 }
 
 /// Reference counted reader properties.
@@ -812,13 +866,9 @@ mod tests {
             props.max_statistics_size(&ColumnPath::from("col")),
             DEFAULT_MAX_STATISTICS_SIZE
         );
-        assert!(!props.bloom_filter_enabled(&ColumnPath::from("col")));
-        assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01);
-        assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None);
-        assert_eq!(
-            props.bloom_filter_max_bytes(&ColumnPath::from("col")),
-            1024 * 1024
-        );
+        assert!(props
+            .bloom_filter_enabled(&ColumnPath::from("col"))
+            .is_none());
     }
 
     #[test]
@@ -903,9 +953,8 @@ mod tests {
             )
             .set_column_max_statistics_size(ColumnPath::from("col"), 123)
             .set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
-            .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100)
+            .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100_u64)
             .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
-            .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000)
             .build();
 
         assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
@@ -947,6 +996,10 @@ mod tests {
             EnabledStatistics::Chunk
         );
         assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123);
+        assert_eq!(
+            props.bloom_filter_enabled(&ColumnPath::from("col")),
+            Some(BloomFilterProperties { fpp: 0.1, ndv: 100 })
+        );
     }
 
     #[test]
@@ -954,6 +1007,7 @@ mod tests {
         let props = WriterProperties::builder()
             .set_encoding(Encoding::DELTA_BINARY_PACKED)
             .set_compression(Compression::GZIP)
+            .set_bloom_filter_enabled(true)
             .set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
             .build();
 
@@ -969,6 +1023,43 @@ mod tests {
             props.dictionary_enabled(&ColumnPath::from("col")),
             DEFAULT_DICTIONARY_ENABLED
         );
+        assert_eq!(
+            props.bloom_filter_enabled(&ColumnPath::from("col")),
+            Some(BloomFilterProperties {
+                fpp: 0.05,
+                ndv: 1_000_000_u64
+            })
+        );
+    }
+
+    #[test]
+    fn test_writer_properties_bloom_filter_ndv_fpp_set() {
+        assert_eq!(
+            WriterProperties::builder()
+                .build()
+                .bloom_filter_enabled(&ColumnPath::from("col")),
+            None
+        );
+        assert_eq!(
+            WriterProperties::builder()
+                .set_bloom_filter_ndv(100)
+                .build()
+                .bloom_filter_enabled(&ColumnPath::from("col")),
+            Some(BloomFilterProperties {
+                fpp: 0.05,
+                ndv: 100
+            })
+        );
+        assert_eq!(
+            WriterProperties::builder()
+                .set_bloom_filter_fpp(0.1)
+                .build()
+                .bloom_filter_enabled(&ColumnPath::from("col")),
+            Some(BloomFilterProperties {
+                fpp: 0.1,
+                ndv: 1_000_000_u64
+            })
+        );
     }
 
     #[test]