You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/15 15:01:44 UTC
[arrow-rs] 03/04: write out to bloom filter
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 9b8a0f51517b3235ccd57461f439a400dbbee4c1
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 21:47:56 2022 +0800
write out to bloom filter
---
parquet/src/bloom_filter/mod.rs | 1 +
parquet/src/column/writer/mod.rs | 15 ++++++++++++++
parquet/src/file/writer.rs | 45 ++++++++++++++++++++++++++++++++++++++--
3 files changed, 59 insertions(+), 2 deletions(-)
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 4944a93f8..d0bee8a5f 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -80,6 +80,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
}
/// A split block Bloom filter
+#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);
const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3cdf04f54..f8e79d792 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -16,6 +16,9 @@
// under the License.
//! Contains column writer API.
+
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
@@ -154,6 +157,9 @@ pub struct ColumnCloseResult {
pub rows_written: u64,
/// Metadata for this column chunk
pub metadata: ColumnChunkMetaData,
+ /// Optional bloom filter for this column
+ #[cfg(feature = "bloom")]
+ pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
/// Optional offset index, identifying page locations
@@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
+ // bloom filter
+ #[cfg(feature = "bloom")]
+ bloom_filter: Option<Sbbf>,
+
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
@@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_column_nulls: 0,
column_distinct_count: None,
},
+ // TODO!
+ #[cfg(feature = "bloom")]
+ bloom_filter: None,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
@@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
rows_written: self.column_metrics.total_rows_written,
+ #[cfg(feature = "bloom")]
+ bloom_filter: self.bloom_filter,
metadata,
column_index,
offset_index,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2efaf7caf..90c9b6bfc 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,10 +18,11 @@
//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.
-use std::{io::Write, sync::Arc};
-
+#[cfg(feature = "bloom")]
+use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
use crate::basic::PageType;
@@ -116,6 +117,8 @@ pub struct SerializedFileWriter<W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
+ #[cfg(feature = "bloom")]
+ bloom_filters: Vec<Vec<Option<Sbbf>>>,
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
row_group_index: usize,
@@ -132,6 +135,8 @@ impl<W: Write> SerializedFileWriter<W> {
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
row_groups: vec![],
+ #[cfg(feature = "bloom")]
+ bloom_filters: vec![],
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
@@ -212,6 +217,32 @@ impl<W: Write> SerializedFileWriter<W> {
Ok(())
}
+ #[cfg(feature = "bloom")]
+ /// Serialize all the bloom filter to the file
+ fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
+ // iter row group
+ // iter each column
+ // write bloom filter to the file
+ for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+ for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate()
+ {
+ match &self.bloom_filters[row_group_idx][column_idx] {
+ Some(bloom_filter) => {
+ let start_offset = self.buf.bytes_written();
+ let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
+ bloom_filter.write_to_out_protocol(&mut protocol)?;
+ protocol.flush()?;
+ let end_offset = self.buf.bytes_written();
+ // set offset and index for bloom filter
+ column_metadata.bloom_filter_offset = Some(start_offset as i64);
+ }
+ None => {}
+ }
+ }
+ }
+ Ok(())
+ }
+
/// Serialize all the column index to the file
fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
@@ -250,6 +281,8 @@ impl<W: Write> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();
+ #[cfg(feature = "bloom")]
+ self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
@@ -320,6 +353,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
column_index: usize,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
+ #[cfg(feature = "bloom")]
+ bloom_filters: Vec<Option<Sbbf>>,
column_indexes: Vec<Option<ColumnIndex>>,
offset_indexes: Vec<Option<OffsetIndex>>,
on_close: Option<OnCloseRowGroup<'a>>,
@@ -348,6 +383,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
column_index: 0,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
+ #[cfg(feature = "bloom")]
+ bloom_filters: Vec::with_capacity(num_columns),
column_indexes: Vec::with_capacity(num_columns),
offset_indexes: Vec::with_capacity(num_columns),
total_bytes_written: 0,
@@ -380,6 +417,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
let column_chunks = &mut self.column_chunks;
let column_indexes = &mut self.column_indexes;
let offset_indexes = &mut self.offset_indexes;
+ #[cfg(feature = "bloom")]
+ let bloom_filters = &mut self.bloom_filters;
let on_close = |r: ColumnCloseResult| {
// Update row group writer metrics
@@ -387,6 +426,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
column_chunks.push(r.metadata);
column_indexes.push(r.column_index);
offset_indexes.push(r.offset_index);
+ #[cfg(feature = "bloom")]
+ bloom_filters.push(r.bloom_filter);
if let Some(rows) = *total_rows_written {
if rows != r.rows_written {