You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/15 15:01:45 UTC
[arrow-rs] 04/04: encode writing
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 5d7624860d3c8aed11b4ed04b0d35ccbcc1802f2
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 23:01:26 2022 +0800
encode writing
---
parquet/src/bloom_filter/mod.rs | 26 ++++++++++++++++++++++++++
parquet/src/file/writer.rs | 16 +++++++++++-----
2 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index d0bee8a5f..0122a3a76 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+ SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
+use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use twox_hash::XxHash64;
@@ -129,6 +131,30 @@ impl Sbbf {
Self(data)
}
+ pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+ for block in &self.0 {
+ for word in block {
+ writer.write_all(&word.to_le_bytes()).map_err(|e| {
+ ParquetError::General(format!(
+ "Could not write bloom filter bit set: {}",
+ e
+ ))
+ })?;
+ }
+ }
+ Ok(())
+ }
+
+ pub fn header(&self) -> BloomFilterHeader {
+ BloomFilterHeader {
+ // 8 i32 per block, 4 bytes per i32
+ num_bytes: self.0.len() as i32 * 4 * 8,
+ algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
+ hash: BloomFilterHash::XXHASH(XxHash {}),
+ compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
+ }
+ }
+
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 90c9b6bfc..bf6ec93fa 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -230,11 +230,16 @@ impl<W: Write> SerializedFileWriter<W> {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
- bloom_filter.write_to_out_protocol(&mut protocol)?;
+ let header = bloom_filter.header();
+ header.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
- let end_offset = self.buf.bytes_written();
+ bloom_filter.write_bitset(&mut self.buf)?;
// set offset and index for bloom filter
- column_metadata.bloom_filter_offset = Some(start_offset as i64);
+ column_metadata
+ .meta_data
+ .as_mut()
+ .expect("can't have bloom filter without column metadata")
+ .bloom_filter_offset = Some(start_offset as i64);
}
None => {}
}
@@ -424,10 +429,10 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
// Update row group writer metrics
*total_bytes_written += r.bytes_written;
column_chunks.push(r.metadata);
- column_indexes.push(r.column_index);
- offset_indexes.push(r.offset_index);
#[cfg(feature = "bloom")]
bloom_filters.push(r.bloom_filter);
+ column_indexes.push(r.column_index);
+ offset_indexes.push(r.offset_index);
if let Some(rows) = *total_rows_written {
if rows != r.rows_written {
@@ -663,6 +668,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
Ok(spec)
}
+
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata