You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/22 12:07:03 UTC

[arrow-rs] branch master updated: parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties (#3119)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e214ccccc parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties (#3119)
e214ccccc is described below

commit e214ccccc702d0295fbf59258a6a817cd09ac4ea
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Tue Nov 22 20:06:57 2022 +0800

    parquet bloom filter part III: add sbbf writer, remove `bloom` default feature, add reader properties (#3119)
    
    * bloom filter part III
    
    - add reader properties
    - add writer properties
    - remove `bloom` feature
    
    * update row group vec
    
    * fix clippy
    
    * fix clippy
    
    * remove default feature for twox
    
    * incorporate ndv and fpp
    
    * fix doc
    
    * add unit test
    
    * fix clippy
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * remove underflow logic
    
    * refactor write
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 parquet/Cargo.toml                           |   9 +-
 parquet/src/bin/parquet-show-bloom-filter.rs |   5 +-
 parquet/src/bloom_filter/mod.rs              | 125 +++++++++++++++++++++-
 parquet/src/column/writer/mod.rs             |  22 ++++
 parquet/src/file/metadata.rs                 |   2 +-
 parquet/src/file/properties.rs               | 151 ++++++++++++++++++++++++---
 parquet/src/file/reader.rs                   |   7 +-
 parquet/src/file/serialized_reader.rs        |  27 +++--
 parquet/src/file/writer.rs                   |  46 +++++++-
 parquet/src/lib.rs                           |   1 -
 10 files changed, 353 insertions(+), 42 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 515da585e..7a150c949 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -57,7 +57,8 @@ seq-macro = { version = "0.3", default-features = false }
 futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
 tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
 hashbrown = { version = "0.13", default-features = false }
-twox-hash = { version = "1.6", optional = true }
+twox-hash = { version = "1.6", default-features = false }
+paste = { version = "1.0" }
 
 [dev-dependencies]
 base64 = { version = "0.13", default-features = false, features = ["std"] }
@@ -77,7 +78,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
 all-features = true
 
 [features]
-default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
 # Enable arrow reader/writer APIs
 arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
 # Enable CLI tools
@@ -90,8 +91,6 @@ test_common = ["arrow/test_utils"]
 experimental = []
 # Enable async APIs
 async = ["futures", "tokio"]
-# Bloomfilter
-bloom = ["twox-hash"]
 
 [[test]]
 name = "arrow_writer_layout"
@@ -115,7 +114,7 @@ required-features = ["arrow", "cli"]
 
 [[bin]]
 name = "parquet-show-bloom-filter"
-required-features = ["cli", "bloom"]
+required-features = ["cli"]
 
 [[bench]]
 name = "arrow_writer"
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
index a4dbdbe67..28493a94c 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -78,10 +78,7 @@ fn main() {
             let row_group_reader = file_reader
                 .get_row_group(ri)
                 .expect("Unable to read row group");
-            if let Some(sbbf) = row_group_reader
-                .get_column_bloom_filter(column_index)
-                .expect("Failed to parse bloom filter")
-            {
+            if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
                 args.values.iter().for_each(|value| {
                     println!(
                         "Value {} is {} in bloom filter",
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 4944a93f8..4efba3834 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -24,11 +24,15 @@ use crate::file::metadata::ColumnChunkMetaData;
 use crate::file::reader::ChunkReader;
 use crate::format::{
     BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+    SplitBlockAlgorithm, Uncompressed, XxHash,
 };
 use bytes::{Buf, Bytes};
 use std::hash::Hasher;
+use std::io::Write;
 use std::sync::Arc;
-use thrift::protocol::{TCompactInputProtocol, TSerializable};
+use thrift::protocol::{
+    TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
+};
 use twox_hash::XxHash64;
 
 /// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
@@ -80,6 +84,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
 }
 
 /// A split block Bloom filter
+#[derive(Debug, Clone)]
 pub struct Sbbf(Vec<Block>);
 
 const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
@@ -113,7 +118,43 @@ fn read_bloom_filter_header_and_length(
     ))
 }
 
+const BITSET_MIN_LENGTH: usize = 32;
+const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
+
+#[inline]
+fn optimal_num_of_bytes(num_bytes: usize) -> usize {
+    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
+    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
+    num_bytes.next_power_of_two()
+}
+
+// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
+// given fpp = (1 - e^(-k * n / m)) ^ k
+// we have m = - k * n / ln(1 - fpp ^ (1 / k))
+// where k = number of hash functions, m = number of bits, n = number of distinct values
+#[inline]
+fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
+    let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
+    num_bits as usize
+}
+
 impl Sbbf {
+    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
+    /// Will panic if `fpp` is greater than 1.0 or less than 0.0.
+    pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
+        assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
+        let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
+        Self::new_with_num_of_bytes(num_bits / 8)
+    }
+
+    /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
+    /// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
+    pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
+        let num_bytes = optimal_num_of_bytes(num_bytes);
+        let bitset = vec![0_u8; num_bytes];
+        Self::new(&bitset)
+    }
+
     fn new(bitset: &[u8]) -> Self {
         let data = bitset
             .chunks_exact(4 * 8)
@@ -128,6 +169,45 @@ impl Sbbf {
         Self(data)
     }
 
+    /// Write the bloom filter data (header and then bitset) to the output
+    pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+        let mut protocol = TCompactOutputProtocol::new(&mut writer);
+        let header = self.header();
+        header.write_to_out_protocol(&mut protocol).map_err(|e| {
+            ParquetError::General(format!("Could not write bloom filter header: {}", e))
+        })?;
+        protocol.flush()?;
+        self.write_bitset(&mut writer)?;
+        Ok(())
+    }
+
+    /// Write the bitset in serialized form to the writer.
+    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+        for block in &self.0 {
+            for word in block {
+                writer.write_all(&word.to_le_bytes()).map_err(|e| {
+                    ParquetError::General(format!(
+                        "Could not write bloom filter bit set: {}",
+                        e
+                    ))
+                })?;
+            }
+        }
+        Ok(())
+    }
+
+    /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
+    fn header(&self) -> BloomFilterHeader {
+        BloomFilterHeader {
+            // 8 i32 per block, 4 bytes per i32
+            num_bytes: self.0.len() as i32 * 4 * 8,
+            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
+            hash: BloomFilterHash::XXHASH(XxHash {}),
+            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
+        }
+    }
+
+    /// Read a new bloom filter from the given offset in the given reader.
     pub fn read_from_column_chunk<R: ChunkReader>(
         column_metadata: &ColumnChunkMetaData,
         reader: Arc<R>,
@@ -292,4 +372,47 @@ mod tests {
         assert_eq!(num_bytes, 32_i32);
         assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
     }
+
+    #[test]
+    fn test_optimal_num_of_bytes() {
+        for (input, expected) in &[
+            (0, 32),
+            (9, 32),
+            (31, 32),
+            (32, 32),
+            (33, 64),
+            (99, 128),
+            (1024, 1024),
+            (999_000_000, 128 * 1024 * 1024),
+        ] {
+            assert_eq!(*expected, optimal_num_of_bytes(*input));
+        }
+    }
+
+    #[test]
+    fn test_num_of_bits_from_ndv_fpp() {
+        for (fpp, ndv, num_bits) in &[
+            (0.1, 10, 57),
+            (0.01, 10, 96),
+            (0.001, 10, 146),
+            (0.1, 100, 577),
+            (0.01, 100, 968),
+            (0.001, 100, 1460),
+            (0.1, 1000, 5772),
+            (0.01, 1000, 9681),
+            (0.001, 1000, 14607),
+            (0.1, 10000, 57725),
+            (0.01, 10000, 96815),
+            (0.001, 10000, 146076),
+            (0.1, 100000, 577254),
+            (0.01, 100000, 968152),
+            (0.001, 100000, 1460769),
+            (0.1, 1000000, 5772541),
+            (0.01, 1000000, 9681526),
+            (0.001, 1000000, 14607697),
+            (1e-50, 1_000_000_000_000, 14226231280773240832),
+        ] {
+            assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
+        }
+    }
 }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3cdf04f54..ae7920e22 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -16,6 +16,8 @@
 // under the License.
 
 //! Contains column writer API.
+
+use crate::bloom_filter::Sbbf;
 use crate::format::{ColumnIndex, OffsetIndex};
 use std::collections::{BTreeSet, VecDeque};
 
@@ -154,6 +156,8 @@ pub struct ColumnCloseResult {
     pub rows_written: u64,
     /// Metadata for this column chunk
     pub metadata: ColumnChunkMetaData,
+    /// Optional bloom filter for this column
+    pub bloom_filter: Option<Sbbf>,
     /// Optional column index, for filtering
     pub column_index: Option<ColumnIndex>,
     /// Optional offset index, identifying page locations
@@ -209,6 +213,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     rep_levels_sink: Vec<i16>,
     data_pages: VecDeque<CompressedPage>,
 
+    // bloom filter
+    bloom_filter: Option<Sbbf>,
+
     // column index and offset index
     column_index_builder: ColumnIndexBuilder,
     offset_index_builder: OffsetIndexBuilder,
@@ -231,6 +238,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         // Used for level information
         encodings.insert(Encoding::RLE);
 
+        let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
+        let bloom_filter = if bloom_filter_enabled {
+            if let Some(ndv) = props.bloom_filter_ndv(descr.path()) {
+                let fpp = props.bloom_filter_fpp(descr.path());
+                Some(Sbbf::new_with_ndv_fpp(ndv, fpp))
+            } else {
+                let max_bytes = props.bloom_filter_max_bytes(descr.path());
+                Some(Sbbf::new_with_num_of_bytes(max_bytes as usize))
+            }
+        } else {
+            None
+        };
+
         Self {
             descr,
             props,
@@ -260,6 +280,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 num_column_nulls: 0,
                 column_distinct_count: None,
             },
+            bloom_filter,
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -458,6 +479,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         Ok(ColumnCloseResult {
             bytes_written: self.column_metrics.total_bytes_written,
             rows_written: self.column_metrics.total_rows_written,
+            bloom_filter: self.bloom_filter,
             metadata,
             column_index,
             offset_index,
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 895776a8a..2ba50fa31 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -236,7 +236,7 @@ pub struct RowGroupMetaData {
 }
 
 impl RowGroupMetaData {
-    /// Returns builer for row group metadata.
+    /// Returns builder for row group metadata.
     pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
         RowGroupMetaDataBuilder::new(schema_descr)
     }
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c65ba8035..03117d4cb 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -64,6 +64,7 @@
 //!     .build();
 //! ```
 
+use paste::paste;
 use std::{collections::HashMap, sync::Arc};
 
 use crate::basic::{Compression, Encoding};
@@ -82,6 +83,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
 const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
 const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
 const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
+const DEFAULT_BLOOM_FILTER_ENABLED: bool = false;
+const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024;
+const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01;
 
 /// Parquet writer version.
 ///
@@ -125,6 +129,26 @@ pub struct WriterProperties {
     sorting_columns: Option<Vec<SortingColumn>>,
 }
 
+macro_rules! def_col_property_getter {
+    ($field:ident, $field_type:ty) => {
+        pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> {
+            self.column_properties
+                .get(col)
+                .and_then(|c| c.$field())
+                .or_else(|| self.default_column_properties.$field())
+        }
+    };
+    ($field:ident, $field_type:ty, $default_val:expr) => {
+        pub fn $field(&self, col: &ColumnPath) -> $field_type {
+            self.column_properties
+                .get(col)
+                .and_then(|c| c.$field())
+                .or_else(|| self.default_column_properties.$field())
+                .unwrap_or($default_val)
+        }
+    };
+}
+
 impl WriterProperties {
     /// Returns builder for writer properties with default values.
     pub fn builder() -> WriterPropertiesBuilder {
@@ -255,6 +279,11 @@ impl WriterProperties {
             .or_else(|| self.default_column_properties.max_statistics_size())
             .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
     }
+
+    def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED);
+    def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP);
+    def_col_property_getter!(bloom_filter_ndv, u64);
+    def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES);
 }
 
 /// Writer properties builder.
@@ -272,6 +301,52 @@ pub struct WriterPropertiesBuilder {
     sorting_columns: Option<Vec<SortingColumn>>,
 }
 
+macro_rules! def_opt_field_setter {
+    ($field: ident, $type: ty) => {
+        paste! {
+            pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
+                self.$field = Some(value);
+                self
+            }
+        }
+    };
+    ($field: ident, $type: ty, $min_value:expr, $max_value:expr) => {
+        paste! {
+            pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
+                if ($min_value..=$max_value).contains(&value) {
+                    self.$field = Some(value);
+                } else {
+                    self.$field = None
+                }
+                self
+            }
+        }
+    };
+}
+
+macro_rules! def_opt_field_getter {
+    ($field: ident, $type: ty) => {
+        paste! {
+            #[doc = "Returns " $field " if set."]
+            pub fn $field(&self) -> Option<$type> {
+                self.$field
+            }
+        }
+    };
+}
+
+macro_rules! def_per_col_setter {
+    ($field:ident, $field_type:ty) => {
+        paste! {
+            #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."]
+            pub fn [<set_column_ $field>](mut self, col: ColumnPath, value: $field_type) -> Self {
+                self.get_mut_props(col).[<set_ $field>](value);
+                self
+            }
+        }
+    }
+}
+
 impl WriterPropertiesBuilder {
     /// Returns default state of the builder.
     fn with_defaults() -> Self {
@@ -284,7 +359,7 @@ impl WriterPropertiesBuilder {
             writer_version: DEFAULT_WRITER_VERSION,
             created_by: DEFAULT_CREATED_BY.to_string(),
             key_value_metadata: None,
-            default_column_properties: ColumnProperties::new(),
+            default_column_properties: Default::default(),
             column_properties: HashMap::new(),
             sorting_columns: None,
         }
@@ -439,7 +514,7 @@ impl WriterPropertiesBuilder {
     fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
         self.column_properties
             .entry(col)
-            .or_insert_with(ColumnProperties::new)
+            .or_insert_with(Default::default)
     }
 
     /// Sets encoding for a column.
@@ -492,6 +567,11 @@ impl WriterPropertiesBuilder {
         self.get_mut_props(col).set_max_statistics_size(value);
         self
     }
+
+    def_per_col_setter!(bloom_filter_enabled, bool);
+    def_per_col_setter!(bloom_filter_fpp, f64);
+    def_per_col_setter!(bloom_filter_max_bytes, u32);
+    def_per_col_setter!(bloom_filter_ndv, u64);
 }
 
 /// Controls the level of statistics to be computed by the writer
@@ -515,27 +595,24 @@ impl Default for EnabledStatistics {
 ///
 /// If a field is `None`, it means that no specific value has been set for this column,
 /// so some subsequent or default value must be used.
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, Default, PartialEq)]
 struct ColumnProperties {
     encoding: Option<Encoding>,
     codec: Option<Compression>,
     dictionary_enabled: Option<bool>,
     statistics_enabled: Option<EnabledStatistics>,
     max_statistics_size: Option<usize>,
+    /// bloom filter enabled
+    bloom_filter_enabled: Option<bool>,
+    /// bloom filter expected number of distinct values
+    bloom_filter_ndv: Option<u64>,
+    /// bloom filter false positive probability
+    bloom_filter_fpp: Option<f64>,
+    /// bloom filter max number of bytes
+    bloom_filter_max_bytes: Option<u32>,
 }
 
 impl ColumnProperties {
-    /// Initialise column properties with default values.
-    fn new() -> Self {
-        Self {
-            encoding: None,
-            codec: None,
-            dictionary_enabled: None,
-            statistics_enabled: None,
-            max_statistics_size: None,
-        }
-    }
-
     /// Sets encoding for this column.
     ///
     /// If dictionary is not enabled, this is treated as a primary encoding for a column.
@@ -572,6 +649,11 @@ impl ColumnProperties {
         self.max_statistics_size = Some(value);
     }
 
+    def_opt_field_setter!(bloom_filter_enabled, bool);
+    def_opt_field_setter!(bloom_filter_fpp, f64, 0.0, 1.0);
+    def_opt_field_setter!(bloom_filter_max_bytes, u32);
+    def_opt_field_setter!(bloom_filter_ndv, u64);
+
     /// Returns optional encoding for this column.
     fn encoding(&self) -> Option<Encoding> {
         self.encoding
@@ -599,17 +681,25 @@ impl ColumnProperties {
     fn max_statistics_size(&self) -> Option<usize> {
         self.max_statistics_size
     }
+
+    def_opt_field_getter!(bloom_filter_enabled, bool);
+    def_opt_field_getter!(bloom_filter_fpp, f64);
+    def_opt_field_getter!(bloom_filter_max_bytes, u32);
+    def_opt_field_getter!(bloom_filter_ndv, u64);
 }
 
 /// Reference counted reader properties.
 pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
 
+const DEFAULT_READ_BLOOM_FILTER: bool = false;
+
 /// Reader properties.
 ///
 /// All properties are immutable and `Send` + `Sync`.
 /// Use [`ReaderPropertiesBuilder`] to assemble these properties.
 pub struct ReaderProperties {
     codec_options: CodecOptions,
+    read_bloom_filter: bool,
 }
 
 impl ReaderProperties {
@@ -622,11 +712,17 @@ impl ReaderProperties {
     pub(crate) fn codec_options(&self) -> &CodecOptions {
         &self.codec_options
     }
+
+    /// Returns whether to read bloom filter
+    pub(crate) fn read_bloom_filter(&self) -> bool {
+        self.read_bloom_filter
+    }
 }
 
 /// Reader properties builder.
 pub struct ReaderPropertiesBuilder {
     codec_options_builder: CodecOptionsBuilder,
+    read_bloom_filter: Option<bool>,
 }
 
 /// Reader properties builder.
@@ -635,6 +731,7 @@ impl ReaderPropertiesBuilder {
     fn with_defaults() -> Self {
         Self {
             codec_options_builder: CodecOptionsBuilder::default(),
+            read_bloom_filter: None,
         }
     }
 
@@ -642,6 +739,9 @@ impl ReaderPropertiesBuilder {
     pub fn build(self) -> ReaderProperties {
         ReaderProperties {
             codec_options: self.codec_options_builder.build(),
+            read_bloom_filter: self
+                .read_bloom_filter
+                .unwrap_or(DEFAULT_READ_BLOOM_FILTER),
         }
     }
 
@@ -659,6 +759,17 @@ impl ReaderPropertiesBuilder {
             .set_backward_compatible_lz4(value);
         self
     }
+
+    /// Enable/disable reading bloom filter
+    ///
+    /// If reading bloom filter is enabled, bloom filter will be read from the file.
+    /// If reading bloom filter is disabled, bloom filter will not be read from the file.
+    ///
+    /// By default bloom filter is set to be read.
+    pub fn set_read_bloom_filter(mut self, value: bool) -> Self {
+        self.read_bloom_filter = Some(value);
+        self
+    }
 }
 
 #[cfg(test)]
@@ -701,6 +812,13 @@ mod tests {
             props.max_statistics_size(&ColumnPath::from("col")),
             DEFAULT_MAX_STATISTICS_SIZE
         );
+        assert!(!props.bloom_filter_enabled(&ColumnPath::from("col")));
+        assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01);
+        assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None);
+        assert_eq!(
+            props.bloom_filter_max_bytes(&ColumnPath::from("col")),
+            1024 * 1024
+        );
     }
 
     #[test]
@@ -784,6 +902,10 @@ mod tests {
                 EnabledStatistics::Chunk,
             )
             .set_column_max_statistics_size(ColumnPath::from("col"), 123)
+            .set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
+            .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100)
+            .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
+            .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000)
             .build();
 
         assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
@@ -858,6 +980,7 @@ mod tests {
             .build();
 
         assert_eq!(props.codec_options(), &codec_options);
+        assert!(!props.read_bloom_filter());
     }
 
     #[test]
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 325944c21..bb82f2299 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -21,7 +21,6 @@
 use bytes::Bytes;
 use std::{boxed::Box, io::Read, sync::Arc};
 
-#[cfg(feature = "bloom")]
 use crate::bloom_filter::Sbbf;
 use crate::column::page::PageIterator;
 use crate::column::{page::PageReader, reader::ColumnReader};
@@ -145,9 +144,9 @@ pub trait RowGroupReader: Send + Sync {
         Ok(col_reader)
     }
 
-    #[cfg(feature = "bloom")]
-    /// Get bloom filter for the `i`th column chunk, if present.
-    fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;
+    /// Get bloom filter for the `i`th column chunk, if present and the reader was configured
+    /// to read bloom filters.
+    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
 
     /// Get iterator of `Row`s from this row group.
     ///
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index cb39dd194..84768aa23 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -20,10 +20,10 @@
 
 use std::collections::VecDeque;
 use std::io::Cursor;
+use std::iter;
 use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
 
 use crate::basic::{Encoding, Type};
-#[cfg(feature = "bloom")]
 use crate::bloom_filter::Sbbf;
 use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
@@ -329,7 +329,7 @@ impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
             f,
             row_group_metadata,
             props,
-        )))
+        )?))
     }
 
     fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
@@ -342,6 +342,7 @@ pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
     chunk_reader: Arc<R>,
     metadata: &'a RowGroupMetaData,
     props: ReaderPropertiesPtr,
+    bloom_filters: Vec<Option<Sbbf>>,
 }
 
 impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
@@ -350,12 +351,22 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
         chunk_reader: Arc<R>,
         metadata: &'a RowGroupMetaData,
         props: ReaderPropertiesPtr,
-    ) -> Self {
-        Self {
+    ) -> Result<Self> {
+        let bloom_filters = if props.read_bloom_filter() {
+            metadata
+                .columns()
+                .iter()
+                .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
+                .collect::<Result<Vec<_>>>()?
+        } else {
+            iter::repeat(None).take(metadata.columns().len()).collect()
+        };
+        Ok(Self {
             chunk_reader,
             metadata,
             props,
-        }
+            bloom_filters,
+        })
     }
 }
 
@@ -388,11 +399,9 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
         )?))
     }
 
-    #[cfg(feature = "bloom")]
     /// get bloom filter for the `i`th column
-    fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>> {
-        let col = self.metadata.column(i);
-        Sbbf::read_from_column_chunk(col, self.chunk_reader.clone())
+    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
+        self.bloom_filters[i].as_ref()
     }
 
     fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2fe0b26e7..3f1731687 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,10 +18,10 @@
 //! Contains file writer API, and provides methods to write row groups and columns by
 //! using row group writers and column writers respectively.
 
-use std::{io::Write, sync::Arc};
-
+use crate::bloom_filter::Sbbf;
 use crate::format as parquet;
 use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::{io::Write, sync::Arc};
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
 
 use crate::basic::PageType;
@@ -92,6 +92,7 @@ pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()
 pub type OnCloseRowGroup<'a> = Box<
     dyn FnOnce(
             RowGroupMetaDataPtr,
+            Vec<Option<Sbbf>>,
             Vec<Option<ColumnIndex>>,
             Vec<Option<OffsetIndex>>,
         ) -> Result<()>
@@ -116,6 +117,7 @@ pub struct SerializedFileWriter<W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
     row_groups: Vec<RowGroupMetaDataPtr>,
+    bloom_filters: Vec<Vec<Option<Sbbf>>>,
     column_indexes: Vec<Vec<Option<ColumnIndex>>>,
     offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
     row_group_index: usize,
@@ -132,6 +134,7 @@ impl<W: Write> SerializedFileWriter<W> {
             descr: Arc::new(SchemaDescriptor::new(schema)),
             props: properties,
             row_groups: vec![],
+            bloom_filters: vec![],
             column_indexes: Vec::new(),
             offset_indexes: Vec::new(),
             row_group_index: 0,
@@ -149,10 +152,15 @@ impl<W: Write> SerializedFileWriter<W> {
         self.row_group_index += 1;
 
         let row_groups = &mut self.row_groups;
+        let row_bloom_filters = &mut self.bloom_filters;
         let row_column_indexes = &mut self.column_indexes;
         let row_offset_indexes = &mut self.offset_indexes;
-        let on_close = |metadata, row_group_column_index, row_group_offset_index| {
+        let on_close = |metadata,
+                        row_group_bloom_filter,
+                        row_group_column_index,
+                        row_group_offset_index| {
             row_groups.push(metadata);
+            row_bloom_filters.push(row_group_bloom_filter);
             row_column_indexes.push(row_group_column_index);
             row_offset_indexes.push(row_group_offset_index);
             Ok(())
@@ -212,6 +220,31 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    /// Serialize all the bloom filter to the file
+    fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write bloom filter to the file
+        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+            for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
+                match &self.bloom_filters[row_group_idx][column_idx] {
+                    Some(bloom_filter) => {
+                        let start_offset = self.buf.bytes_written();
+                        bloom_filter.write(&mut self.buf)?;
+                        // set offset and index for bloom filter
+                        column_chunk
+                            .meta_data
+                            .as_mut()
+                            .expect("can't have bloom filter without column metadata")
+                            .bloom_filter_offset = Some(start_offset as i64);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
     /// Serialize all the column index to the file
     fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
         // iter row group
@@ -250,6 +283,7 @@ impl<W: Write> SerializedFileWriter<W> {
             .map(|v| v.to_thrift())
             .collect::<Vec<_>>();
 
+        self.write_bloom_filters(&mut row_groups)?;
         // Write column indexes and offset indexes
         self.write_column_indexes(&mut row_groups)?;
         self.write_offset_indexes(&mut row_groups)?;
@@ -320,6 +354,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
     column_index: usize,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    bloom_filters: Vec<Option<Sbbf>>,
     column_indexes: Vec<Option<ColumnIndex>>,
     offset_indexes: Vec<Option<OffsetIndex>>,
     on_close: Option<OnCloseRowGroup<'a>>,
@@ -348,6 +383,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             column_index: 0,
             row_group_metadata: None,
             column_chunks: Vec::with_capacity(num_columns),
+            bloom_filters: Vec::with_capacity(num_columns),
             column_indexes: Vec::with_capacity(num_columns),
             offset_indexes: Vec::with_capacity(num_columns),
             total_bytes_written: 0,
@@ -380,11 +416,13 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
         let column_chunks = &mut self.column_chunks;
         let column_indexes = &mut self.column_indexes;
         let offset_indexes = &mut self.offset_indexes;
+        let bloom_filters = &mut self.bloom_filters;
 
         let on_close = |r: ColumnCloseResult| {
             // Update row group writer metrics
             *total_bytes_written += r.bytes_written;
             column_chunks.push(r.metadata);
+            bloom_filters.push(r.bloom_filter);
             column_indexes.push(r.column_index);
             offset_indexes.push(r.offset_index);
 
@@ -443,6 +481,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             if let Some(on_close) = self.on_close.take() {
                 on_close(
                     metadata,
+                    self.bloom_filters.clone(),
                     self.column_indexes.clone(),
                     self.offset_indexes.clone(),
                 )?
@@ -623,6 +662,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
 
         Ok(spec)
     }
+
     fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
         let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
         metadata
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index cd29d02f8..4cdba1dc5 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -84,7 +84,6 @@ pub mod arrow;
 pub mod column;
 experimental!(mod compression);
 experimental!(mod encodings);
-#[cfg(feature = "bloom")]
 pub mod bloom_filter;
 pub mod file;
 pub mod record;