You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/13 20:12:23 UTC
[arrow-rs] branch master updated: Optimize bulk writing of all blocks of bloom filter (#3340)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2749dcca5 Optimize bulk writing of all blocks of bloom filter (#3340)
2749dcca5 is described below
commit 2749dcca50e6dd0ac72db7fe802552c2db742c3c
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Dec 13 12:12:17 2022 -0800
Optimize bulk writing of all blocks of bloom filter (#3340)
---
parquet/src/bloom_filter/mod.rs | 15 +++++++++------
parquet/src/file/writer.rs | 9 +++++++--
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index e6742aefc..1a561bf16 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -28,7 +28,7 @@ use crate::format::{
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
-use std::io::{BufWriter, Write};
+use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
@@ -220,10 +220,10 @@ impl Sbbf {
Self(data)
}
- /// Write the bloom filter data (header and then bitset) to the output
- pub(crate) fn write<W: Write>(&self, writer: W) -> Result<(), ParquetError> {
- // Use a BufWriter to avoid costs of writing individual blocks
- let mut writer = BufWriter::new(writer);
+ /// Write the bloom filter data (header and then bitset) to the output. This doesn't
+ /// flush the writer in order to boost performance of bulk writing all blocks. Caller
+ /// must remember to flush the writer.
+ pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
@@ -231,7 +231,6 @@ impl Sbbf {
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
- writer.flush()?;
Ok(())
}
@@ -330,6 +329,10 @@ impl Sbbf {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}
+
+ pub(crate) fn block_num(&self) -> usize {
+ self.0.len()
+ }
}
// per spec we use xxHash with seed=0
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 3f1731687..d92a42a65 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -21,6 +21,7 @@
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::io::BufWriter;
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
@@ -225,23 +226,27 @@ impl<W: Write> SerializedFileWriter<W> {
// iter row group
// iter each column
// write bloom filter to the file
+ let mut start_offset = self.buf.bytes_written();
+ let mut writer = BufWriter::new(&mut self.buf);
+
for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
match &self.bloom_filters[row_group_idx][column_idx] {
Some(bloom_filter) => {
- let start_offset = self.buf.bytes_written();
- bloom_filter.write(&mut self.buf)?;
+ bloom_filter.write(&mut writer)?;
// set offset and index for bloom filter
column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_offset = Some(start_offset as i64);
+ start_offset += bloom_filter.block_num() * 32;
}
None => {}
}
}
}
+ writer.flush()?;
Ok(())
}