You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/19 17:29:57 UTC

[arrow-rs] branch master updated: Put BufWriter into TrackedWrite (#3361)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e2abb4bf5 Put BufWriter into TrackedWrite (#3361)
e2abb4bf5 is described below

commit e2abb4bf5e1e0057a0a2e9fb55934a370b52cb3a
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Dec 19 09:29:52 2022 -0800

    Put BufWriter into TrackedWrite (#3361)
    
    * Put BufWriter into TrackedWrite
    
    * Update benchmark
    
    * Update TrackedWrite doc.
    
    * Update
---
 parquet/benches/arrow_writer.rs  |  9 +++++---
 parquet/src/bloom_filter/mod.rs  |  4 ----
 parquet/src/column/writer/mod.rs | 16 ++++++++++----
 parquet/src/file/writer.rs       | 46 +++++++++++++++++++++++++++-------------
 4 files changed, 49 insertions(+), 26 deletions(-)

diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index 676debf5c..a590ceb59 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -17,7 +17,10 @@
 
 #[macro_use]
 extern crate criterion;
+
 use criterion::{Criterion, Throughput};
+use std::env;
+use std::fs::File;
 
 extern crate arrow;
 extern crate parquet;
@@ -312,9 +315,9 @@ fn write_batch_with_option(
     batch: &RecordBatch,
     props: Option<WriterProperties>,
 ) -> Result<()> {
-    // Write batch to an in-memory writer
-    let buffer = vec![];
-    let mut writer = ArrowWriter::try_new(buffer, batch.schema(), props)?;
+    let path = env::temp_dir().join("arrow_writer.temp");
+    let file = File::create(path).unwrap();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), props)?;
 
     writer.write(batch)?;
     writer.close()?;
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index a6620fc14..e255a8dc1 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -329,10 +329,6 @@ impl Sbbf {
         let block_index = self.hash_to_block_index(hash);
         self.0[block_index].check(hash as u32)
     }
-
-    pub(crate) fn block_num(&self) -> usize {
-        self.0.len()
-    }
 }
 
 // per spec we use xxHash with seed=0
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 1010dc156..fb2449202 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1680,6 +1680,8 @@ mod tests {
         assert_eq!(stats.null_count(), 0);
         assert!(stats.distinct_count().is_none());
 
+        drop(write);
+
         let props = ReaderProperties::builder()
             .set_backward_compatible_lz4(false)
             .build();
@@ -1724,6 +1726,8 @@ mod tests {
         let r = writer.close().unwrap();
         assert!(r.metadata.statistics().is_none());
 
+        drop(write);
+
         let props = ReaderProperties::builder()
             .set_backward_compatible_lz4(false)
             .build();
@@ -1842,8 +1846,8 @@ mod tests {
         // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
         // and no fallback occurred so far.
         let mut file = tempfile::tempfile().unwrap();
-        let mut writer = TrackedWrite::new(&mut file);
-        let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
+        let mut write = TrackedWrite::new(&mut file);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
         let props = Arc::new(
             WriterProperties::builder()
                 .set_data_pagesize_limit(10)
@@ -1855,6 +1859,8 @@ mod tests {
         writer.write_batch(data, None, None).unwrap();
         let r = writer.close().unwrap();
 
+        drop(write);
+
         // Read pages and check the sequence
         let props = ReaderProperties::builder()
             .set_backward_compatible_lz4(false)
@@ -2196,8 +2202,8 @@ mod tests {
         rep_levels: Option<&[i16]>,
     ) {
         let mut file = tempfile::tempfile().unwrap();
-        let mut writer = TrackedWrite::new(&mut file);
-        let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
+        let mut write = TrackedWrite::new(&mut file);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
 
         let max_def_level = match def_levels {
             Some(buf) => *buf.iter().max().unwrap_or(&0i16),
@@ -2228,6 +2234,8 @@ mod tests {
         assert_eq!(values_written, values.len());
         let result = writer.close().unwrap();
 
+        drop(write);
+
         let props = ReaderProperties::builder()
             .set_backward_compatible_lz4(false)
             .build();
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index d92a42a65..a12d5477c 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -21,7 +21,7 @@
 use crate::bloom_filter::Sbbf;
 use crate::format as parquet;
 use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
-use std::io::BufWriter;
+use std::io::{BufWriter, IoSlice};
 use std::{io::Write, sync::Arc};
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
 
@@ -44,17 +44,19 @@ use crate::schema::types::{
 };
 
 /// A wrapper around a [`Write`] that keeps track of the number
-/// of bytes that have been written
-pub struct TrackedWrite<W> {
-    inner: W,
+/// of bytes that have been written. The given [`Write`] is wrapped
+/// with a [`BufWriter`] to optimize writing performance.
+pub struct TrackedWrite<W: Write> {
+    inner: BufWriter<W>,
     bytes_written: usize,
 }
 
 impl<W: Write> TrackedWrite<W> {
     /// Create a new [`TrackedWrite`] from a [`Write`]
     pub fn new(inner: W) -> Self {
+        let buf_write = BufWriter::new(inner);
         Self {
-            inner,
+            inner: buf_write,
             bytes_written: 0,
         }
     }
@@ -65,8 +67,13 @@ impl<W: Write> TrackedWrite<W> {
     }
 
     /// Returns the underlying writer.
-    pub fn into_inner(self) -> W {
-        self.inner
+    pub fn into_inner(self) -> Result<W> {
+        self.inner.into_inner().map_err(|err| {
+            ParquetError::General(format!(
+                "fail to get inner writer: {:?}",
+                err.to_string()
+            ))
+        })
     }
 }
 
@@ -77,6 +84,19 @@ impl<W: Write> Write for TrackedWrite<W> {
         Ok(bytes)
     }
 
+    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
+        let bytes = self.inner.write_vectored(bufs)?;
+        self.bytes_written += bytes;
+        Ok(bytes)
+    }
+
+    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
+        self.inner.write_all(buf)?;
+        self.bytes_written += buf.len();
+
+        Ok(())
+    }
+
     fn flush(&mut self) -> std::io::Result<()> {
         self.inner.flush()
     }
@@ -226,27 +246,23 @@ impl<W: Write> SerializedFileWriter<W> {
         // iter row group
         // iter each column
         // write bloom filter to the file
-        let mut start_offset = self.buf.bytes_written();
-        let mut writer = BufWriter::new(&mut self.buf);
-
         for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
             for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
                 match &self.bloom_filters[row_group_idx][column_idx] {
                     Some(bloom_filter) => {
-                        bloom_filter.write(&mut writer)?;
+                        let start_offset = self.buf.bytes_written();
+                        bloom_filter.write(&mut self.buf)?;
                         // set offset and index for bloom filter
                         column_chunk
                             .meta_data
                             .as_mut()
                             .expect("can't have bloom filter without column metadata")
                             .bloom_filter_offset = Some(start_offset as i64);
-                        start_offset += bloom_filter.block_num() * 32;
                     }
                     None => {}
                 }
             }
         }
-        writer.flush()?;
         Ok(())
     }
 
@@ -336,7 +352,7 @@ impl<W: Write> SerializedFileWriter<W> {
         self.assert_previous_writer_closed()?;
         let _ = self.write_metadata()?;
 
-        Ok(self.buf.into_inner())
+        self.buf.into_inner()
     }
 }
 
@@ -558,7 +574,7 @@ impl<'a> SerializedColumnWriter<'a> {
 /// Writes and serializes pages and metadata into output stream.
 ///
 /// `SerializedPageWriter` should not be used after calling `close()`.
-pub struct SerializedPageWriter<'a, W> {
+pub struct SerializedPageWriter<'a, W: Write> {
     sink: &'a mut TrackedWrite<W>,
 }