You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/15 15:01:44 UTC

[arrow-rs] 03/04: write out to bloom filter

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 9b8a0f51517b3235ccd57461f439a400dbbee4c1
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 21:47:56 2022 +0800

    write out to bloom filter
---
 parquet/src/bloom_filter/mod.rs  |  1 +
 parquet/src/column/writer/mod.rs | 15 ++++++++++++++
 parquet/src/file/writer.rs       | 45 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 4944a93f8..d0bee8a5f 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -80,6 +80,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
 }
 
 /// A split block Bloom filter
+#[derive(Debug, Clone)]
 pub struct Sbbf(Vec<Block>);
 
 const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3cdf04f54..f8e79d792 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -16,6 +16,9 @@
 // under the License.
 
 //! Contains column writer API.
+
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
 use crate::format::{ColumnIndex, OffsetIndex};
 use std::collections::{BTreeSet, VecDeque};
 
@@ -154,6 +157,9 @@ pub struct ColumnCloseResult {
     pub rows_written: u64,
     /// Metadata for this column chunk
     pub metadata: ColumnChunkMetaData,
+    /// Optional bloom filter for this column
+    #[cfg(feature = "bloom")]
+    pub bloom_filter: Option<Sbbf>,
     /// Optional column index, for filtering
     pub column_index: Option<ColumnIndex>,
     /// Optional offset index, identifying page locations
@@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     rep_levels_sink: Vec<i16>,
     data_pages: VecDeque<CompressedPage>,
 
+    // bloom filter
+    #[cfg(feature = "bloom")]
+    bloom_filter: Option<Sbbf>,
+
     // column index and offset index
     column_index_builder: ColumnIndexBuilder,
     offset_index_builder: OffsetIndexBuilder,
@@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 num_column_nulls: 0,
                 column_distinct_count: None,
             },
+            // TODO!
+            #[cfg(feature = "bloom")]
+            bloom_filter: None,
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         Ok(ColumnCloseResult {
             bytes_written: self.column_metrics.total_bytes_written,
             rows_written: self.column_metrics.total_rows_written,
+            #[cfg(feature = "bloom")]
+            bloom_filter: self.bloom_filter,
             metadata,
             column_index,
             offset_index,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2efaf7caf..90c9b6bfc 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,10 +18,11 @@
 //! Contains file writer API, and provides methods to write row groups and columns by
 //! using row group writers and column writers respectively.
 
-use std::{io::Write, sync::Arc};
-
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
 use crate::format as parquet;
 use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::{io::Write, sync::Arc};
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
 
 use crate::basic::PageType;
@@ -116,6 +117,8 @@ pub struct SerializedFileWriter<W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
     row_groups: Vec<RowGroupMetaDataPtr>,
+    #[cfg(feature = "bloom")]
+    bloom_filters: Vec<Vec<Option<Sbbf>>>,
     column_indexes: Vec<Vec<Option<ColumnIndex>>>,
     offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
     row_group_index: usize,
@@ -132,6 +135,8 @@ impl<W: Write> SerializedFileWriter<W> {
             descr: Arc::new(SchemaDescriptor::new(schema)),
             props: properties,
             row_groups: vec![],
+            #[cfg(feature = "bloom")]
+            bloom_filters: vec![],
             column_indexes: Vec::new(),
             offset_indexes: Vec::new(),
             row_group_index: 0,
@@ -212,6 +217,32 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    #[cfg(feature = "bloom")]
+    /// Serialize all the bloom filter to the file
+    fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write bloom filter to the file
+        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate()
+            {
+                match &self.bloom_filters[row_group_idx][column_idx] {
+                    Some(bloom_filter) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
+                        bloom_filter.write_to_out_protocol(&mut protocol)?;
+                        protocol.flush()?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for bloom filter
+                        column_metadata.bloom_filter_offset = Some(start_offset as i64);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
     /// Serialize all the column index to the file
     fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
         // iter row group
@@ -250,6 +281,8 @@ impl<W: Write> SerializedFileWriter<W> {
             .map(|v| v.to_thrift())
             .collect::<Vec<_>>();
 
+        #[cfg(feature = "bloom")]
+        self.write_bloom_filters(&mut row_groups)?;
         // Write column indexes and offset indexes
         self.write_column_indexes(&mut row_groups)?;
         self.write_offset_indexes(&mut row_groups)?;
@@ -320,6 +353,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
     column_index: usize,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    #[cfg(feature = "bloom")]
+    bloom_filters: Vec<Option<Sbbf>>,
     column_indexes: Vec<Option<ColumnIndex>>,
     offset_indexes: Vec<Option<OffsetIndex>>,
     on_close: Option<OnCloseRowGroup<'a>>,
@@ -348,6 +383,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             column_index: 0,
             row_group_metadata: None,
             column_chunks: Vec::with_capacity(num_columns),
+            #[cfg(feature = "bloom")]
+            bloom_filters: Vec::with_capacity(num_columns),
             column_indexes: Vec::with_capacity(num_columns),
             offset_indexes: Vec::with_capacity(num_columns),
             total_bytes_written: 0,
@@ -380,6 +417,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
         let column_chunks = &mut self.column_chunks;
         let column_indexes = &mut self.column_indexes;
         let offset_indexes = &mut self.offset_indexes;
+        #[cfg(feature = "bloom")]
+        let bloom_filters = &mut self.bloom_filters;
 
         let on_close = |r: ColumnCloseResult| {
             // Update row group writer metrics
@@ -387,6 +426,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             column_chunks.push(r.metadata);
             column_indexes.push(r.column_index);
             offset_indexes.push(r.offset_index);
+            #[cfg(feature = "bloom")]
+            bloom_filters.push(r.bloom_filter);
 
             if let Some(rows) = *total_rows_written {
                 if rows != r.rows_written {