You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/15 15:01:45 UTC

[arrow-rs] 04/04: encode writing

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter-3
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 5d7624860d3c8aed11b4ed04b0d35ccbcc1802f2
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Nov 15 23:01:26 2022 +0800

    encode writing
---
 parquet/src/bloom_filter/mod.rs | 26 ++++++++++++++++++++++++++
 parquet/src/file/writer.rs      | 16 +++++++++++-----
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index d0bee8a5f..0122a3a76 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData;
 use crate::file::reader::ChunkReader;
 use crate::format::{
     BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
+    SplitBlockAlgorithm, Uncompressed, XxHash,
 };
 use bytes::{Buf, Bytes};
 use std::hash::Hasher;
+use std::io::Write;
 use std::sync::Arc;
 use thrift::protocol::{TCompactInputProtocol, TSerializable};
 use twox_hash::XxHash64;
@@ -129,6 +131,30 @@ impl Sbbf {
         Self(data)
     }
 
+    pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
+        for block in &self.0 {
+            for word in block {
+                writer.write_all(&word.to_le_bytes()).map_err(|e| {
+                    ParquetError::General(format!(
+                        "Could not write bloom filter bit set: {}",
+                        e
+                    ))
+                })?;
+            }
+        }
+        Ok(())
+    }
+
+    pub fn header(&self) -> BloomFilterHeader {
+        BloomFilterHeader {
+            // 8 i32 per block, 4 bytes per i32
+            num_bytes: self.0.len() as i32 * 4 * 8,
+            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
+            hash: BloomFilterHash::XXHASH(XxHash {}),
+            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
+        }
+    }
+
     pub fn read_from_column_chunk<R: ChunkReader>(
         column_metadata: &ColumnChunkMetaData,
         reader: Arc<R>,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 90c9b6bfc..bf6ec93fa 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -230,11 +230,16 @@ impl<W: Write> SerializedFileWriter<W> {
                     Some(bloom_filter) => {
                         let start_offset = self.buf.bytes_written();
                         let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
-                        bloom_filter.write_to_out_protocol(&mut protocol)?;
+                        let header = bloom_filter.header();
+                        header.write_to_out_protocol(&mut protocol)?;
                         protocol.flush()?;
-                        let end_offset = self.buf.bytes_written();
+                        bloom_filter.write_bitset(&mut self.buf)?;
                         // set offset and index for bloom filter
-                        column_metadata.bloom_filter_offset = Some(start_offset as i64);
+                        column_metadata
+                            .meta_data
+                            .as_mut()
+                            .expect("can't have bloom filter without column metadata")
+                            .bloom_filter_offset = Some(start_offset as i64);
                     }
                     None => {}
                 }
@@ -424,10 +429,10 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             // Update row group writer metrics
             *total_bytes_written += r.bytes_written;
             column_chunks.push(r.metadata);
-            column_indexes.push(r.column_index);
-            offset_indexes.push(r.offset_index);
             #[cfg(feature = "bloom")]
             bloom_filters.push(r.bloom_filter);
+            column_indexes.push(r.column_index);
+            offset_indexes.push(r.offset_index);
 
             if let Some(rows) = *total_rows_written {
                 if rows != r.rows_written {
@@ -663,6 +668,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
 
         Ok(spec)
     }
+
     fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
         let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
         metadata