You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/19 17:29:57 UTC
[arrow-rs] branch master updated: Put BufWriter into TrackedWrite (#3361)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new e2abb4bf5 Put BufWriter into TrackedWrite (#3361)
e2abb4bf5 is described below
commit e2abb4bf5e1e0057a0a2e9fb55934a370b52cb3a
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Dec 19 09:29:52 2022 -0800
Put BufWriter into TrackedWrite (#3361)
* Put BufWriter into TrackedWrite
* Update benchmark
* Update TrackedWrite doc.
* Update
---
parquet/benches/arrow_writer.rs | 9 +++++---
parquet/src/bloom_filter/mod.rs | 4 ----
parquet/src/column/writer/mod.rs | 16 ++++++++++----
parquet/src/file/writer.rs | 46 +++++++++++++++++++++++++++-------------
4 files changed, 49 insertions(+), 26 deletions(-)
diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index 676debf5c..a590ceb59 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -17,7 +17,10 @@
#[macro_use]
extern crate criterion;
+
use criterion::{Criterion, Throughput};
+use std::env;
+use std::fs::File;
extern crate arrow;
extern crate parquet;
@@ -312,9 +315,9 @@ fn write_batch_with_option(
batch: &RecordBatch,
props: Option<WriterProperties>,
) -> Result<()> {
- // Write batch to an in-memory writer
- let buffer = vec![];
- let mut writer = ArrowWriter::try_new(buffer, batch.schema(), props)?;
+ let path = env::temp_dir().join("arrow_writer.temp");
+ let file = File::create(path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, batch.schema(), props)?;
writer.write(batch)?;
writer.close()?;
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index a6620fc14..e255a8dc1 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -329,10 +329,6 @@ impl Sbbf {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}
-
- pub(crate) fn block_num(&self) -> usize {
- self.0.len()
- }
}
// per spec we use xxHash with seed=0
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 1010dc156..fb2449202 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1680,6 +1680,8 @@ mod tests {
assert_eq!(stats.null_count(), 0);
assert!(stats.distinct_count().is_none());
+ drop(write);
+
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
@@ -1724,6 +1726,8 @@ mod tests {
let r = writer.close().unwrap();
assert!(r.metadata.statistics().is_none());
+ drop(write);
+
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
@@ -1842,8 +1846,8 @@ mod tests {
// ARROW-5129: Test verifies that we add data page in case of dictionary encoding
// and no fallback occurred so far.
let mut file = tempfile::tempfile().unwrap();
- let mut writer = TrackedWrite::new(&mut file);
- let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
+ let mut write = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
let props = Arc::new(
WriterProperties::builder()
.set_data_pagesize_limit(10)
@@ -1855,6 +1859,8 @@ mod tests {
writer.write_batch(data, None, None).unwrap();
let r = writer.close().unwrap();
+ drop(write);
+
// Read pages and check the sequence
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
@@ -2196,8 +2202,8 @@ mod tests {
rep_levels: Option<&[i16]>,
) {
let mut file = tempfile::tempfile().unwrap();
- let mut writer = TrackedWrite::new(&mut file);
- let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
+ let mut write = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
let max_def_level = match def_levels {
Some(buf) => *buf.iter().max().unwrap_or(&0i16),
@@ -2228,6 +2234,8 @@ mod tests {
assert_eq!(values_written, values.len());
let result = writer.close().unwrap();
+ drop(write);
+
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index d92a42a65..a12d5477c 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -21,7 +21,7 @@
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
-use std::io::BufWriter;
+use std::io::{BufWriter, IoSlice};
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};
@@ -44,17 +44,19 @@ use crate::schema::types::{
};
/// A wrapper around a [`Write`] that keeps track of the number
-/// of bytes that have been written
-pub struct TrackedWrite<W> {
- inner: W,
+/// of bytes that have been written. The given [`Write`] is wrapped
+/// with a [`BufWriter`] to optimize writing performance.
+pub struct TrackedWrite<W: Write> {
+ inner: BufWriter<W>,
bytes_written: usize,
}
impl<W: Write> TrackedWrite<W> {
/// Create a new [`TrackedWrite`] from a [`Write`]
pub fn new(inner: W) -> Self {
+ let buf_write = BufWriter::new(inner);
Self {
- inner,
+ inner: buf_write,
bytes_written: 0,
}
}
@@ -65,8 +67,13 @@ impl<W: Write> TrackedWrite<W> {
}
/// Returns the underlying writer.
- pub fn into_inner(self) -> W {
- self.inner
+ pub fn into_inner(self) -> Result<W> {
+ self.inner.into_inner().map_err(|err| {
+ ParquetError::General(format!(
+ "fail to get inner writer: {:?}",
+ err.to_string()
+ ))
+ })
}
}
@@ -77,6 +84,19 @@ impl<W: Write> Write for TrackedWrite<W> {
Ok(bytes)
}
+ fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
+ let bytes = self.inner.write_vectored(bufs)?;
+ self.bytes_written += bytes;
+ Ok(bytes)
+ }
+
+ fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
+ self.inner.write_all(buf)?;
+ self.bytes_written += buf.len();
+
+ Ok(())
+ }
+
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
@@ -226,27 +246,23 @@ impl<W: Write> SerializedFileWriter<W> {
// iter row group
// iter each column
// write bloom filter to the file
- let mut start_offset = self.buf.bytes_written();
- let mut writer = BufWriter::new(&mut self.buf);
-
for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
match &self.bloom_filters[row_group_idx][column_idx] {
Some(bloom_filter) => {
- bloom_filter.write(&mut writer)?;
+ let start_offset = self.buf.bytes_written();
+ bloom_filter.write(&mut self.buf)?;
// set offset and index for bloom filter
column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_offset = Some(start_offset as i64);
- start_offset += bloom_filter.block_num() * 32;
}
None => {}
}
}
}
- writer.flush()?;
Ok(())
}
@@ -336,7 +352,7 @@ impl<W: Write> SerializedFileWriter<W> {
self.assert_previous_writer_closed()?;
let _ = self.write_metadata()?;
- Ok(self.buf.into_inner())
+ self.buf.into_inner()
}
}
@@ -558,7 +574,7 @@ impl<'a> SerializedColumnWriter<'a> {
/// Writes and serializes pages and metadata into output stream.
///
/// `SerializedPageWriter` should not be used after calling `close()`.
-pub struct SerializedPageWriter<'a, W> {
+pub struct SerializedPageWriter<'a, W: Write> {
sink: &'a mut TrackedWrite<W>,
}