You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/13 20:12:23 UTC

[arrow-rs] branch master updated: Optimize bulk writing of all blocks of bloom filter (#3340)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2749dcca5 Optimize bulk writing of all blocks of bloom filter (#3340)
2749dcca5 is described below

commit 2749dcca50e6dd0ac72db7fe802552c2db742c3c
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Dec 13 12:12:17 2022 -0800

    Optimize bulk writing of all blocks of bloom filter (#3340)
---
 parquet/src/bloom_filter/mod.rs | 15 +++++++++------
 parquet/src/file/writer.rs      |  9 +++++++--
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index e6742aefc..1a561bf16 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -28,7 +28,7 @@ use crate::format::{
 };
 use bytes::{Buf, Bytes};
 use std::hash::Hasher;
-use std::io::{BufWriter, Write};
+use std::io::Write;
 use std::sync::Arc;
 use thrift::protocol::{
     TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
@@ -220,10 +220,10 @@ impl Sbbf {
         Self(data)
     }
 
-    /// Write the bloom filter data (header and then bitset) to the output
-    pub(crate) fn write<W: Write>(&self, writer: W) -> Result<(), ParquetError> {
-        // Use a BufWriter to avoid costs of writing individual blocks
-        let mut writer = BufWriter::new(writer);
+    /// Write the bloom filter data (header and then bitset) to the output. This doesn't
+    /// flush the writer in order to boost performance of bulk writing all blocks. Caller
+    /// must remember to flush the writer.
+    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
         let mut protocol = TCompactOutputProtocol::new(&mut writer);
         let header = self.header();
         header.write_to_out_protocol(&mut protocol).map_err(|e| {
@@ -231,7 +231,6 @@ impl Sbbf {
         })?;
         protocol.flush()?;
         self.write_bitset(&mut writer)?;
-        writer.flush()?;
         Ok(())
     }
 
@@ -330,6 +329,10 @@ impl Sbbf {
         let block_index = self.hash_to_block_index(hash);
         self.0[block_index].check(hash as u32)
     }
+
+    pub(crate) fn block_num(&self) -> usize {
+        self.0.len()
+    }
 }
 
 // per spec we use xxHash with seed=0
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 3f1731687..d92a42a65 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -21,6 +21,7 @@
 use crate::bloom_filter::Sbbf;
 use crate::format as parquet;
 use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
+use std::io::BufWriter;
 use std::{io::Write, sync::Arc};
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
 
@@ -225,23 +226,27 @@ impl<W: Write> SerializedFileWriter<W> {
         // iter row group
         // iter each column
         // write bloom filter to the file
+        let mut start_offset = self.buf.bytes_written();
+        let mut writer = BufWriter::new(&mut self.buf);
+
         for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
             for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
                 match &self.bloom_filters[row_group_idx][column_idx] {
                     Some(bloom_filter) => {
-                        let start_offset = self.buf.bytes_written();
-                        bloom_filter.write(&mut self.buf)?;
+                        bloom_filter.write(&mut writer)?;
                         // set offset and index for bloom filter
                         column_chunk
                             .meta_data
                             .as_mut()
                             .expect("can't have bloom filter without column metadata")
                             .bloom_filter_offset = Some(start_offset as i64);
+                        start_offset += bloom_filter.block_num() * 32;
                     }
                     None => {}
                 }
             }
         }
+        writer.flush()?;
         Ok(())
     }