You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/15 15:01:41 UTC

[arrow-rs] branch add-bloom-filter-3 created (now 5d7624860)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


      at 5d7624860 encode writing

This branch includes the following new commits:

     new 777b0dc6f add column setter
     new 63fa6434a add writer properties
     new 9b8a0f515 write out to bloom filter
     new 5d7624860 encode writing

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-rs] 04/04: encode writing

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 5d7624860d3c8aed11b4ed04b0d35ccbcc1802f2
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 23:01:26 2022 +0800

    encode writing
---
 parquet/src/bloom_filter/mod.rs | 26 ++++++++++++++++++++++++++
 parquet/src/file/writer.rs      | 16 +++++++++++-----
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index d0bee8a5f..0122a3a76 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData;
 use crate::file::reader::ChunkReader;
 use crate::format::{
     BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+    SplitBlockAlgorithm, Uncompressed, XxHash,
 };
 use bytes::{Buf, Bytes};
 use std::hash::Hasher;
+use std::io::Write;
 use std::sync::Arc;
 use thrift::protocol::{TCompactInputProtocol, TSerializable};
 use twox_hash::XxHash64;
@@ -129,6 +131,30 @@ impl Sbbf {
         Self(data)
     }
 
+    pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+        for block in &self.0 {
+            for word in block {
+                writer.write_all(&word.to_le_bytes()).map_err(|e| {
+                    ParquetError::General(format!(
+                        "Could not write bloom filter bit set: {}",
+                        e
+                    ))
+                })?;
+            }
+        }
+        Ok(())
+    }
+
+    pub fn header(&self) -> BloomFilterHeader {
+        BloomFilterHeader {
+            // 8 i32 per block, 4 bytes per i32
+            num_bytes: self.0.len() as i32 * 4 * 8,
+            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
+            hash: BloomFilterHash::XXHASH(XxHash {}),
+            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
+        }
+    }
+
     pub fn read_from_column_chunk<R: ChunkReader>(
         column_metadata: &ColumnChunkMetaData,
         reader: Arc<R>,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 90c9b6bfc..bf6ec93fa 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -230,11 +230,16 @@ impl<W: Write> SerializedFileWriter<W> {
                     Some(bloom_filter) => {
                         let start_offset = self.buf.bytes_written();
                         let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
-                        bloom_filter.write_to_out_protocol(&mut protocol)?;
+                        let header = bloom_filter.header();
+                        header.write_to_out_protocol(&mut protocol)?;
                         protocol.flush()?;
-                        let end_offset = self.buf.bytes_written();
+                        bloom_filter.write_bitset(&mut self.buf)?;
                         // set offset and index for bloom filter
-                        column_metadata.bloom_filter_offset = Some(start_offset as i64);
+                        column_metadata
+                            .meta_data
+                            .as_mut()
+                            .expect("can't have bloom filter without column metadata")
+                            .bloom_filter_offset = Some(start_offset as i64);
                     }
                     None => {}
                 }
@@ -424,10 +429,10 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             // Update row group writer metrics
             *total_bytes_written += r.bytes_written;
             column_chunks.push(r.metadata);
-            column_indexes.push(r.column_index);
-            offset_indexes.push(r.offset_index);
             #[cfg(feature = "bloom")]
             bloom_filters.push(r.bloom_filter);
+            column_indexes.push(r.column_index);
+            offset_indexes.push(r.offset_index);
 
             if let Some(rows) = *total_rows_written {
                 if rows != r.rows_written {
@@ -663,6 +668,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
 
         Ok(spec)
     }
+
     fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
         let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
         metadata


[arrow-rs] 01/04: add column setter

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 777b0dc6f7d4a08af896772893071681c9d17b21
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 20:53:32 2022 +0800

    add column setter
---
 parquet/Cargo.toml             |   1 +
 parquet/src/file/properties.rs | 102 +++++++++++++++++++++++++++++++++++------
 2 files changed, 89 insertions(+), 14 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index fc7c8218a..72baaf338 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -58,6 +58,7 @@ futures = { version = "0.3", default-features = false, features = ["std"], optio
 tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
 hashbrown = { version = "0.13", default-features = false }
 twox-hash = { version = "1.6", optional = true }
+paste = "1.0"
 
 [dev-dependencies]
 base64 = { version = "0.13", default-features = false, features = ["std"] }
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index cf821df21..c0e789ca1 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -248,6 +248,15 @@ impl WriterProperties {
             .or_else(|| self.default_column_properties.max_statistics_size())
             .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
     }
+
+    /// Returns `true` if bloom filter is enabled for a column.
+    pub fn bloom_filter_enabled(&self, col: &ColumnPath) -> bool {
+        self.column_properties
+            .get(col)
+            .and_then(|c| c.bloom_filter_enabled())
+            .or_else(|| self.default_column_properties.bloom_filter_enabled())
+            .unwrap_or(false)
+    }
 }
 
 /// Writer properties builder.
@@ -264,6 +273,16 @@ pub struct WriterPropertiesBuilder {
     column_properties: HashMap<ColumnPath, ColumnProperties>,
 }
 
+macro_rules! def_per_col_setter {
+    ($field:ident, $field_type:expr) => {
+        // The macro will expand into the contents of this block.
+        pub fn concat_idents!(set_, $field)(mut self, value: $field_type) -> Self {
+            self.$field = value;
+            self
+        }
+    };
+}
+
 impl WriterPropertiesBuilder {
     /// Returns default state of the builder.
     fn with_defaults() -> Self {
@@ -276,7 +295,7 @@ impl WriterPropertiesBuilder {
             writer_version: DEFAULT_WRITER_VERSION,
             created_by: DEFAULT_CREATED_BY.to_string(),
             key_value_metadata: None,
-            default_column_properties: ColumnProperties::new(),
+            default_column_properties: Default::default(),
             column_properties: HashMap::new(),
         }
     }
@@ -306,6 +325,8 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    def_per_col_setter!(writer_version, WriterVersion);
+
     /// Sets best effort maximum size of a data page in bytes.
     ///
     /// Note: this is a best effort limit based on value of
@@ -423,7 +444,7 @@ impl WriterPropertiesBuilder {
     fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
         self.column_properties
             .entry(col)
-            .or_insert_with(ColumnProperties::new)
+            .or_insert_with(Default::default)
     }
 
     /// Sets encoding for a column.
@@ -476,6 +497,17 @@ impl WriterPropertiesBuilder {
         self.get_mut_props(col).set_max_statistics_size(value);
         self
     }
+
+    /// Sets bloom filter enabled for a column.
+    /// Takes precedence over globally defined settings.
+    pub fn set_column_bloom_filter_enabled(
+        mut self,
+        col: ColumnPath,
+        value: bool,
+    ) -> Self {
+        self.get_mut_props(col).set_bloom_filter_enabled(value);
+        self
+    }
 }
 
 /// Controls the level of statistics to be computed by the writer
@@ -499,27 +531,24 @@ impl Default for EnabledStatistics {
 ///
 /// If a field is `None`, it means that no specific value has been set for this column,
 /// so some subsequent or default value must be used.
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, Default, PartialEq)]
 struct ColumnProperties {
     encoding: Option<Encoding>,
     codec: Option<Compression>,
     dictionary_enabled: Option<bool>,
     statistics_enabled: Option<EnabledStatistics>,
     max_statistics_size: Option<usize>,
+    /// bloom filter enabled
+    bloom_filter_enabled: Option<bool>,
+    /// bloom filter expected number of distinct values
+    bloom_filter_ndv: Option<u64>,
+    /// bloom filter false positive probability
+    bloom_filter_fpp: Option<f64>,
+    /// bloom filter max number of bytes
+    bloom_filter_max_bytes: Option<u32>,
 }
 
 impl ColumnProperties {
-    /// Initialise column properties with default values.
-    fn new() -> Self {
-        Self {
-            encoding: None,
-            codec: None,
-            dictionary_enabled: None,
-            statistics_enabled: None,
-            max_statistics_size: None,
-        }
-    }
-
     /// Sets encoding for this column.
     ///
     /// If dictionary is not enabled, this is treated as a primary encoding for a column.
@@ -556,6 +585,26 @@ impl ColumnProperties {
         self.max_statistics_size = Some(value);
     }
 
+    /// Sets bloom filter enabled
+    fn set_bloom_filter_enabled(&mut self, enabled: bool) {
+        self.bloom_filter_enabled = Some(enabled);
+    }
+
+    /// Sets bloom filter max size in bytes
+    fn set_bloom_filter_max_size(&mut self, value: u32) {
+        self.bloom_filter_max_bytes = Some(value);
+    }
+
+    /// Sets bloom filter expected number of distinct values
+    fn set_bloom_filter_ndv(&mut self, value: u64) {
+        self.bloom_filter_ndv = Some(value);
+    }
+
+    /// Sets bloom filter false positive probability
+    fn set_bloom_filter_fpp(&mut self, value: f64) {
+        self.bloom_filter_fpp = Some(value);
+    }
+
     /// Returns optional encoding for this column.
     fn encoding(&self) -> Option<Encoding> {
         self.encoding
@@ -583,6 +632,30 @@ impl ColumnProperties {
     fn max_statistics_size(&self) -> Option<usize> {
         self.max_statistics_size
     }
+
+    /// Returns `Some(true)` if bloom filter is enabled for this column, if disabled then
+    /// returns `Some(false)`. If result is `None`, then no setting has been provided.
+    fn bloom_filter_enabled(&self) -> Option<bool> {
+        self.bloom_filter_enabled
+    }
+
+    /// Returns `Some(u32)` if bloom filter max size in bytes is set for this column,
+    /// if not set then returns `None`.
+    fn bloom_filter_max_bytes(&self) -> Option<u32> {
+        self.bloom_filter_max_bytes
+    }
+
+    /// Returns `Some(u64)` if bloom filter number of distinct values is set for this column,
+    /// if not set then returns `None`.
+    fn bloom_filter_ndv(&self) -> Option<u64> {
+        self.bloom_filter_ndv
+    }
+
+    /// Returns `Some(f64)` if bloom filter false positive probability is set for this column,
+    /// if not set then returns `None`.
+    fn bloom_filter_fpp(&self) -> Option<f64> {
+        self.bloom_filter_fpp
+    }
 }
 
 /// Reference counted reader properties.
@@ -685,6 +758,7 @@ mod tests {
             props.max_statistics_size(&ColumnPath::from("col")),
             DEFAULT_MAX_STATISTICS_SIZE
         );
+        assert_eq!(props.bloom_filter_enabled(&ColumnPath::from("col")), false);
     }
 
     #[test]


[arrow-rs] 02/04: add writer properties

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 63fa6434aca28e9f646bf6840334fdf6fe0abedc
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 21:23:02 2022 +0800

    add writer properties
---
 parquet/src/file/properties.rs | 148 ++++++++++++++++++++++-------------------
 1 file changed, 80 insertions(+), 68 deletions(-)

diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c0e789ca1..c62bfe0bc 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -64,6 +64,7 @@
 //!     .build();
 //! ```
 
+use paste::paste;
 use std::{collections::HashMap, sync::Arc};
 
 use crate::basic::{Compression, Encoding};
@@ -81,6 +82,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
 const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
 const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
 const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
+const DEFAULT_BLOOM_FILTER_ENABLED: bool = false;
+const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024;
+const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01;
 
 /// Parquet writer version.
 ///
@@ -123,6 +127,26 @@ pub struct WriterProperties {
     column_properties: HashMap<ColumnPath, ColumnProperties>,
 }
 
+macro_rules! def_col_property_getter {
+    ($field:ident, $field_type:ty) => {
+        pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> {
+            self.column_properties
+                .get(col)
+                .and_then(|c| c.$field())
+                .or_else(|| self.default_column_properties.$field())
+        }
+    };
+    ($field:ident, $field_type:ty, $default_val:expr) => {
+        pub fn $field(&self, col: &ColumnPath) -> $field_type {
+            self.column_properties
+                .get(col)
+                .and_then(|c| c.$field())
+                .or_else(|| self.default_column_properties.$field())
+                .unwrap_or($default_val)
+        }
+    };
+}
+
 impl WriterProperties {
     /// Returns builder for writer properties with default values.
     pub fn builder() -> WriterPropertiesBuilder {
@@ -249,14 +273,10 @@ impl WriterProperties {
             .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
     }
 
-    /// Returns `true` if bloom filter is enabled for a column.
-    pub fn bloom_filter_enabled(&self, col: &ColumnPath) -> bool {
-        self.column_properties
-            .get(col)
-            .and_then(|c| c.bloom_filter_enabled())
-            .or_else(|| self.default_column_properties.bloom_filter_enabled())
-            .unwrap_or(false)
-    }
+    def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED);
+    def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP);
+    def_col_property_getter!(bloom_filter_ndv, u64);
+    def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES);
 }
 
 /// Writer properties builder.
@@ -273,16 +293,40 @@ pub struct WriterPropertiesBuilder {
     column_properties: HashMap<ColumnPath, ColumnProperties>,
 }
 
-macro_rules! def_per_col_setter {
-    ($field:ident, $field_type:expr) => {
-        // The macro will expand into the contents of this block.
-        pub fn concat_idents!(set_, $field)(mut self, value: $field_type) -> Self {
-            self.$field = value;
-            self
+macro_rules! def_opt_field_setter {
+    ($field: ident, $type: ty) => {
+        paste! {
+            pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
+                self.$field = Some(value);
+                self
+            }
+        }
+    };
+}
+
+macro_rules! def_opt_field_getter {
+    ($field: ident, $type: ty) => {
+        paste! {
+            #[doc = "Returns " $field " if set."]
+            pub fn $field(&self) -> Option<$type> {
+                self.$field
+            }
         }
     };
 }
 
+macro_rules! def_per_col_setter {
+    ($field:ident, $field_type:ty) => {
+        paste! {
+            #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."]
+            pub fn [<set_column_ $field>](mut self, col: ColumnPath, value: $field_type) -> Self {
+                self.get_mut_props(col).[<set_ $field>](value);
+                self
+            }
+        }
+    }
+}
+
 impl WriterPropertiesBuilder {
     /// Returns default state of the builder.
     fn with_defaults() -> Self {
@@ -325,8 +369,6 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    def_per_col_setter!(writer_version, WriterVersion);
-
     /// Sets best effort maximum size of a data page in bytes.
     ///
     /// Note: this is a best effort limit based on value of
@@ -498,16 +540,10 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    /// Sets bloom filter enabled for a column.
-    /// Takes precedence over globally defined settings.
-    pub fn set_column_bloom_filter_enabled(
-        mut self,
-        col: ColumnPath,
-        value: bool,
-    ) -> Self {
-        self.get_mut_props(col).set_bloom_filter_enabled(value);
-        self
-    }
+    def_per_col_setter!(bloom_filter_enabled, bool);
+    def_per_col_setter!(bloom_filter_fpp, f64);
+    def_per_col_setter!(bloom_filter_max_bytes, u32);
+    def_per_col_setter!(bloom_filter_ndv, u64);
 }
 
 /// Controls the level of statistics to be computed by the writer
@@ -585,25 +621,10 @@ impl ColumnProperties {
         self.max_statistics_size = Some(value);
     }
 
-    /// Sets bloom filter enabled
-    fn set_bloom_filter_enabled(&mut self, enabled: bool) {
-        self.bloom_filter_enabled = Some(enabled);
-    }
-
-    /// Sets bloom filter max size in bytes
-    fn set_bloom_filter_max_size(&mut self, value: u32) {
-        self.bloom_filter_max_bytes = Some(value);
-    }
-
-    /// Sets bloom filter expected number of distinct values
-    fn set_bloom_filter_ndv(&mut self, value: u64) {
-        self.bloom_filter_ndv = Some(value);
-    }
-
-    /// Sets bloom filter false positive probability
-    fn set_bloom_filter_fpp(&mut self, value: f64) {
-        self.bloom_filter_fpp = Some(value);
-    }
+    def_opt_field_setter!(bloom_filter_enabled, bool);
+    def_opt_field_setter!(bloom_filter_fpp, f64);
+    def_opt_field_setter!(bloom_filter_max_bytes, u32);
+    def_opt_field_setter!(bloom_filter_ndv, u64);
 
     /// Returns optional encoding for this column.
     fn encoding(&self) -> Option<Encoding> {
@@ -633,29 +654,10 @@ impl ColumnProperties {
         self.max_statistics_size
     }
 
-    /// Returns `Some(true)` if bloom filter is enabled for this column, if disabled then
-    /// returns `Some(false)`. If result is `None`, then no setting has been provided.
-    fn bloom_filter_enabled(&self) -> Option<bool> {
-        self.bloom_filter_enabled
-    }
-
-    /// Returns `Some(u32)` if bloom filter max size in bytes is set for this column,
-    /// if not set then returns `None`.
-    fn bloom_filter_max_bytes(&self) -> Option<u32> {
-        self.bloom_filter_max_bytes
-    }
-
-    /// Returns `Some(u64)` if bloom filter number of distinct values is set for this column,
-    /// if not set then returns `None`.
-    fn bloom_filter_ndv(&self) -> Option<u64> {
-        self.bloom_filter_ndv
-    }
-
-    /// Returns `Some(f64)` if bloom filter false positive probability is set for this column,
-    /// if not set then returns `None`.
-    fn bloom_filter_fpp(&self) -> Option<f64> {
-        self.bloom_filter_fpp
-    }
+    def_opt_field_getter!(bloom_filter_enabled, bool);
+    def_opt_field_getter!(bloom_filter_fpp, f64);
+    def_opt_field_getter!(bloom_filter_max_bytes, u32);
+    def_opt_field_getter!(bloom_filter_ndv, u64);
 }
 
 /// Reference counted reader properties.
@@ -759,6 +761,12 @@ mod tests {
             DEFAULT_MAX_STATISTICS_SIZE
         );
         assert_eq!(props.bloom_filter_enabled(&ColumnPath::from("col")), false);
+        assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01);
+        assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None);
+        assert_eq!(
+            props.bloom_filter_max_bytes(&ColumnPath::from("col")),
+            1024 * 1024
+        );
     }
 
     #[test]
@@ -842,6 +850,10 @@ mod tests {
                 EnabledStatistics::Chunk,
             )
             .set_column_max_statistics_size(ColumnPath::from("col"), 123)
+            .set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
+            .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100)
+            .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
+            .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000)
             .build();
 
         assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);


[arrow-rs] 03/04: write out to bloom filter

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 9b8a0f51517b3235ccd57461f439a400dbbee4c1
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 21:47:56 2022 +0800

    write out to bloom filter
---
 parquet/src/bloom_filter/mod.rs  |  1 +
 parquet/src/column/writer/mod.rs | 15 ++++++++++++++
 parquet/src/file/writer.rs       | 45 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 4944a93f8..d0bee8a5f 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -80,6 +80,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
 }
 
 /// A split block Bloom filter
+#[derive(Debug, Clone)]
 pub struct Sbbf(Vec<Block>);
 
 const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3cdf04f54..f8e79d792 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -16,6 +16,9 @@
 // under the License.
 
 //! Contains column writer API.
+
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
 use crate::format::{ColumnIndex, OffsetIndex};
 use std::collections::{BTreeSet, VecDeque};
 
@@ -154,6 +157,9 @@ pub struct ColumnCloseResult {
     pub rows_written: u64,
     /// Metadata for this column chunk
     pub metadata: ColumnChunkMetaData,
+    /// Optional bloom filter for this column
+    #[cfg(feature = "bloom")]
+    pub bloom_filter: Option<Sbbf>,
     /// Optional column index, for filtering
     pub column_index: Option<ColumnIndex>,
     /// Optional offset index, identifying page locations
@@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     rep_levels_sink: Vec<i16>,
     data_pages: VecDeque<CompressedPage>,
 
+    // bloom filter
+    #[cfg(feature = "bloom")]
+    bloom_filter: Option<Sbbf>,
+
     // column index and offset index
     column_index_builder: ColumnIndexBuilder,
     offset_index_builder: OffsetIndexBuilder,
@@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 num_column_nulls: 0,
                 column_distinct_count: None,
             },
+            // TODO!
+            #[cfg(feature = "bloom")]
+            bloom_filter: None,
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         Ok(ColumnCloseResult {
             bytes_written: self.column_metrics.total_bytes_written,
             rows_written: self.column_metrics.total_rows_written,
+            #[cfg(feature = "bloom")]
+            bloom_filter: self.bloom_filter,
             metadata,
             column_index,
             offset_index,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2efaf7caf..90c9b6bfc 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,10 +18,11 @@
 //! Contains file writer API, and provides methods to write row groups and columns by
 //! using row group writers and column writers respectively.
 
-use std::{io::Write, sync::Arc};
-
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
 use crate::format as parquet;
 use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::{io::Write, sync::Arc};
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
 
 use crate::basic::PageType;
@@ -116,6 +117,8 @@ pub struct SerializedFileWriter<W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
     row_groups: Vec<RowGroupMetaDataPtr>,
+    #[cfg(feature = "bloom")]
+    bloom_filters: Vec<Vec<Option<Sbbf>>>,
     column_indexes: Vec<Vec<Option<ColumnIndex>>>,
     offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
     row_group_index: usize,
@@ -132,6 +135,8 @@ impl<W: Write> SerializedFileWriter<W> {
             descr: Arc::new(SchemaDescriptor::new(schema)),
             props: properties,
             row_groups: vec![],
+            #[cfg(feature = "bloom")]
+            bloom_filters: vec![],
             column_indexes: Vec::new(),
             offset_indexes: Vec::new(),
             row_group_index: 0,
@@ -212,6 +217,32 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    #[cfg(feature = "bloom")]
+    /// Serialize all the bloom filter to the file
+    fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write bloom filter to the file
+        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate()
+            {
+                match &self.bloom_filters[row_group_idx][column_idx] {
+                    Some(bloom_filter) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
+                        bloom_filter.write_to_out_protocol(&mut protocol)?;
+                        protocol.flush()?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for bloom filter
+                        column_metadata.bloom_filter_offset = Some(start_offset as i64);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
     /// Serialize all the column index to the file
     fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
         // iter row group
@@ -250,6 +281,8 @@ impl<W: Write> SerializedFileWriter<W> {
             .map(|v| v.to_thrift())
             .collect::<Vec<_>>();
 
+        #[cfg(feature = "bloom")]
+        self.write_bloom_filters(&mut row_groups)?;
         // Write column indexes and offset indexes
         self.write_column_indexes(&mut row_groups)?;
         self.write_offset_indexes(&mut row_groups)?;
@@ -320,6 +353,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
     column_index: usize,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    #[cfg(feature = "bloom")]
+    bloom_filters: Vec<Option<Sbbf>>,
     column_indexes: Vec<Option<ColumnIndex>>,
     offset_indexes: Vec<Option<OffsetIndex>>,
     on_close: Option<OnCloseRowGroup<'a>>,
@@ -348,6 +383,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             column_index: 0,
             row_group_metadata: None,
             column_chunks: Vec::with_capacity(num_columns),
+            #[cfg(feature = "bloom")]
+            bloom_filters: Vec::with_capacity(num_columns),
             column_indexes: Vec::with_capacity(num_columns),
             offset_indexes: Vec::with_capacity(num_columns),
             total_bytes_written: 0,
@@ -380,6 +417,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
         let column_chunks = &mut self.column_chunks;
         let column_indexes = &mut self.column_indexes;
         let offset_indexes = &mut self.offset_indexes;
+        #[cfg(feature = "bloom")]
+        let bloom_filters = &mut self.bloom_filters;
 
         let on_close = |r: ColumnCloseResult| {
             // Update row group writer metrics
@@ -387,6 +426,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             column_chunks.push(r.metadata);
             column_indexes.push(r.column_index);
             offset_indexes.push(r.offset_index);
+            #[cfg(feature = "bloom")]
+            bloom_filters.push(r.bloom_filter);
 
             if let Some(rows) = *total_rows_written {
                 if rows != r.rows_written {