You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/25 08:33:18 UTC
[arrow-rs] branch master updated: Change parquet writers to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163) (#1719)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 722fcfcf2 Change parquet writers to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163) (#1719)
722fcfcf2 is described below
commit 722fcfcf2f55672c2bae626e3f652a3a792dff13
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 25 09:33:12 2022 +0100
Change parquet writers to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163) (#1719)
* Rustify parquet writer (#1717) (#1163)
* Fix parquet_derive
* Fix benches
* Fix parquet_derive tests
* Use raw vec instead of Cursor
* Review feedback
* Fix unnecessary unwrap
---
parquet/benches/arrow_writer.rs | 8 +-
parquet/src/arrow/array_reader/list_array.rs | 2 +-
parquet/src/arrow/arrow_reader.rs | 39 +-
parquet/src/arrow/arrow_writer.rs | 44 +-
parquet/src/arrow/schema.rs | 4 +-
parquet/src/column/mod.rs | 22 +-
parquet/src/column/writer.rs | 76 ++--
parquet/src/data_type.rs | 32 +-
parquet/src/file/mod.rs | 6 +-
parquet/src/file/writer.rs | 633 ++++++++++++---------------
parquet/src/record/record_writer.rs | 6 +-
parquet/src/util/cursor.rs | 8 +-
parquet/src/util/io.rs | 10 +-
parquet/tests/boolean_writer.rs | 25 +-
parquet/tests/custom_writer.rs | 100 -----
parquet_derive/src/lib.rs | 6 +-
parquet_derive/src/parquet_field.rs | 24 +-
parquet_derive_test/src/lib.rs | 7 +-
18 files changed, 448 insertions(+), 604 deletions(-)
diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index f1154eb9e..25ff1ca90 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -26,9 +26,7 @@ use std::sync::Arc;
use arrow::datatypes::*;
use arrow::{record_batch::RecordBatch, util::data_gen::*};
-use parquet::{
- arrow::ArrowWriter, errors::Result, file::writer::InMemoryWriteableCursor,
-};
+use parquet::{arrow::ArrowWriter, errors::Result};
fn create_primitive_bench_batch(
size: usize,
@@ -278,8 +276,8 @@ fn _create_nested_bench_batch(
#[inline]
fn write_batch(batch: &RecordBatch) -> Result<()> {
// Write batch to an in-memory writer
- let cursor = InMemoryWriteableCursor::default();
- let mut writer = ArrowWriter::try_new(cursor, batch.schema(), None)?;
+ let buffer = vec![];
+ let mut writer = ArrowWriter::try_new(buffer, batch.schema(), None)?;
writer.write(batch)?;
writer.close()?;
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index 2d199f69e..808f815e6 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -564,7 +564,7 @@ mod tests {
.set_max_row_group_size(200)
.build();
- let mut writer = ArrowWriter::try_new(
+ let writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
Arc::new(arrow_schema),
Some(props),
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index e3a1d1233..34a14f372 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -263,7 +263,6 @@ mod tests {
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
- use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType, Int32Type, Int64Type,
@@ -271,7 +270,7 @@ mod tests {
use crate::errors::Result;
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::reader::{FileReader, SerializedFileReader};
- use crate::file::writer::{FileWriter, SerializedFileWriter};
+ use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
use crate::util::cursor::SliceableCursor;
@@ -936,21 +935,24 @@ mod tests {
for (idx, v) in values.iter().enumerate() {
let def_levels = def_levels.map(|d| d[idx].as_slice());
let mut row_group_writer = writer.next_row_group()?;
- let mut column_writer = row_group_writer
- .next_column()?
- .expect("Column writer is none!");
+ {
+ let mut column_writer = row_group_writer
+ .next_column()?
+ .expect("Column writer is none!");
- get_typed_column_writer_mut::<T>(&mut column_writer)
- .write_batch(v, def_levels, None)?;
+ column_writer
+ .typed::<T>()
+ .write_batch(v, def_levels, None)?;
- row_group_writer.close_column(column_writer)?;
- writer.close_row_group(row_group_writer)?
+ column_writer.close()?;
+ }
+ row_group_writer.close()?;
}
writer.close()
}
- fn get_test_reader(file_name: &str) -> Arc<dyn FileReader> {
+ fn get_test_reader(file_name: &str) -> Arc<SerializedFileReader<File>> {
let file = get_test_file(file_name);
let reader =
@@ -1094,15 +1096,18 @@ mod tests {
)
.unwrap();
- let mut row_group_writer = writer.next_row_group().unwrap();
- let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
+ {
+ let mut row_group_writer = writer.next_row_group().unwrap();
+ let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
- get_typed_column_writer_mut::<Int32Type>(&mut column_writer)
- .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
- .unwrap();
+ column_writer
+ .typed::<Int32Type>()
+ .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
+ .unwrap();
- row_group_writer.close_column(column_writer).unwrap();
- writer.close_row_group(row_group_writer).unwrap();
+ column_writer.close().unwrap();
+ row_group_writer.close().unwrap();
+ }
writer.close().unwrap();
}
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 1918c9675..e1fd93a9b 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -18,6 +18,7 @@
//! Contains writer which writes arrow data into parquet data.
use std::collections::VecDeque;
+use std::io::Write;
use std::sync::Arc;
use arrow::array as arrow_array;
@@ -35,10 +36,8 @@ use super::schema::{
use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::properties::WriterProperties;
-use crate::{
- data_type::*,
- file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
-};
+use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
+use crate::{data_type::*, file::writer::SerializedFileWriter};
/// Arrow writer
///
@@ -46,7 +45,7 @@ use crate::{
/// to produce row groups with `max_row_group_size` rows. Any remaining rows will be
/// flushed on close, leading the final row group in the output file to potentially
/// contain fewer than `max_row_group_size` rows
-pub struct ArrowWriter<W: ParquetWriter> {
+pub struct ArrowWriter<W: Write> {
/// Underlying Parquet writer
writer: SerializedFileWriter<W>,
@@ -65,7 +64,7 @@ pub struct ArrowWriter<W: ParquetWriter> {
max_row_group_size: usize,
}
-impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+impl<W: Write> ArrowWriter<W> {
/// Try to create a new Arrow writer
///
/// The writer will fail if:
@@ -185,17 +184,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
})
.collect();
- write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?;
+ write_leaves(&mut row_group_writer, &arrays, &mut levels)?;
}
- self.writer.close_row_group(row_group_writer)?;
+ row_group_writer.close()?;
self.buffered_rows -= num_rows;
Ok(())
}
/// Close and finalize the underlying Parquet writer
- pub fn close(&mut self) -> Result<parquet_format::FileMetaData> {
+ pub fn close(mut self) -> Result<parquet_format::FileMetaData> {
self.flush()?;
self.writer.close()
}
@@ -203,15 +202,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
/// Convenience method to get the next ColumnWriter from the RowGroupWriter
#[inline]
-fn get_col_writer(row_group_writer: &mut dyn RowGroupWriter) -> Result<ColumnWriter> {
+fn get_col_writer<'a, W: Write>(
+ row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>,
+) -> Result<SerializedColumnWriter<'a>> {
let col_writer = row_group_writer
.next_column()?
.expect("Unable to get column writer");
Ok(col_writer)
}
-fn write_leaves(
- row_group_writer: &mut dyn RowGroupWriter,
+fn write_leaves<W: Write>(
+ row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
arrays: &[ArrayRef],
levels: &mut [Vec<LevelInfo>],
) -> Result<()> {
@@ -250,12 +251,12 @@ fn write_leaves(
let mut col_writer = get_col_writer(row_group_writer)?;
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
write_leaf(
- &mut col_writer,
+ col_writer.untyped(),
array,
levels.pop().expect("Levels exhausted"),
)?;
}
- row_group_writer.close_column(col_writer)?;
+ col_writer.close()?;
Ok(())
}
ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
@@ -313,12 +314,12 @@ fn write_leaves(
// cast dictionary to a primitive
let array = arrow::compute::cast(array, value_type)?;
write_leaf(
- &mut col_writer,
+ col_writer.untyped(),
&array,
levels.pop().expect("Levels exhausted"),
)?;
}
- row_group_writer.close_column(col_writer)?;
+ col_writer.close()?;
Ok(())
}
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
@@ -336,8 +337,8 @@ fn write_leaves(
}
fn write_leaf(
- writer: &mut ColumnWriter,
- column: &arrow_array::ArrayRef,
+ writer: &mut ColumnWriter<'_>,
+ column: &ArrayRef,
levels: LevelInfo,
) -> Result<i64> {
let indices = levels.filter_array_indices();
@@ -705,7 +706,6 @@ mod tests {
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
- writer::InMemoryWriteableCursor,
};
#[test]
@@ -744,16 +744,14 @@ mod tests {
let expected_batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
- let cursor = InMemoryWriteableCursor::default();
+ let mut buffer = vec![];
{
- let mut writer = ArrowWriter::try_new(cursor.clone(), schema, None).unwrap();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
writer.write(&expected_batch).unwrap();
writer.close().unwrap();
}
- let buffer = cursor.into_inner().unwrap();
-
let cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
let reader = SerializedFileReader::new(cursor).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index 820aa7e7a..5416e4078 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -1591,7 +1591,7 @@ mod tests {
// write to an empty parquet file so that schema is serialized
let file = tempfile::tempfile().unwrap();
- let mut writer = ArrowWriter::try_new(
+ let writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
Arc::new(schema.clone()),
None,
@@ -1660,7 +1660,7 @@ mod tests {
// write to an empty parquet file so that schema is serialized
let file = tempfile::tempfile().unwrap();
- let mut writer = ArrowWriter::try_new(
+ let writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
Arc::new(schema.clone()),
None,
diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs
index 7ed7bfc25..93a4f00d2 100644
--- a/parquet/src/column/mod.rs
+++ b/parquet/src/column/mod.rs
@@ -40,10 +40,11 @@
//!
//! use parquet::{
//! column::{reader::ColumnReader, writer::ColumnWriter},
+//! data_type::Int32Type,
//! file::{
//! properties::WriterProperties,
//! reader::{FileReader, SerializedFileReader},
-//! writer::{FileWriter, SerializedFileWriter},
+//! writer::SerializedFileWriter,
//! },
//! schema::parser::parse_message_type,
//! };
@@ -65,20 +66,17 @@
//! let props = Arc::new(WriterProperties::builder().build());
//! let file = fs::File::create(path).unwrap();
//! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
+//!
//! let mut row_group_writer = writer.next_row_group().unwrap();
//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
-//! match col_writer {
-//! // You can also use `get_typed_column_writer` method to extract typed writer.
-//! ColumnWriter::Int32ColumnWriter(ref mut typed_writer) => {
-//! typed_writer
-//! .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1]))
-//! .unwrap();
-//! }
-//! _ => {}
-//! }
-//! row_group_writer.close_column(col_writer).unwrap();
+//! col_writer
+//! .typed::<Int32Type>()
+//! .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1]))
+//! .unwrap();
+//! col_writer.close().unwrap();
//! }
-//! writer.close_row_group(row_group_writer).unwrap();
+//! row_group_writer.close().unwrap();
+//!
//! writer.close().unwrap();
//!
//! // Reading data using column reader API.
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index a7d0ba8fc..d80cafe0e 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -39,15 +39,15 @@ use crate::util::bit_util::FromBytes;
use crate::util::memory::ByteBufferPtr;
/// Column writer for a Parquet type.
-pub enum ColumnWriter {
- BoolColumnWriter(ColumnWriterImpl<BoolType>),
- Int32ColumnWriter(ColumnWriterImpl<Int32Type>),
- Int64ColumnWriter(ColumnWriterImpl<Int64Type>),
- Int96ColumnWriter(ColumnWriterImpl<Int96Type>),
- FloatColumnWriter(ColumnWriterImpl<FloatType>),
- DoubleColumnWriter(ColumnWriterImpl<DoubleType>),
- ByteArrayColumnWriter(ColumnWriterImpl<ByteArrayType>),
- FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>),
+pub enum ColumnWriter<'a> {
+ BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
+ Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
+ Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
+ Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
+ FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
+ DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
+ ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
+ FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
}
pub enum Level {
@@ -76,11 +76,11 @@ macro_rules! gen_stats_section {
}
/// Gets a specific column writer corresponding to column descriptor `descr`.
-pub fn get_column_writer(
+pub fn get_column_writer<'a>(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
- page_writer: Box<dyn PageWriter>,
-) -> ColumnWriter {
+ page_writer: Box<dyn PageWriter + 'a>,
+) -> ColumnWriter<'a> {
match descr.physical_type() {
Type::BOOLEAN => ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(
descr,
@@ -139,9 +139,9 @@ pub fn get_typed_column_writer<T: DataType>(
}
/// Similar to `get_typed_column_writer` but returns a reference.
-pub fn get_typed_column_writer_ref<T: DataType>(
- col_writer: &ColumnWriter,
-) -> &ColumnWriterImpl<T> {
+pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
+ col_writer: &'b ColumnWriter<'a>,
+) -> &'b ColumnWriterImpl<'a, T> {
T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
panic!(
"Failed to convert column writer into a typed column writer for `{}` type",
@@ -151,9 +151,9 @@ pub fn get_typed_column_writer_ref<T: DataType>(
}
/// Similar to `get_typed_column_writer` but returns a reference.
-pub fn get_typed_column_writer_mut<T: DataType>(
- col_writer: &mut ColumnWriter,
-) -> &mut ColumnWriterImpl<T> {
+pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
+ col_writer: &'a mut ColumnWriter<'b>,
+) -> &'a mut ColumnWriterImpl<'b, T> {
T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
panic!(
"Failed to convert column writer into a typed column writer for `{}` type",
@@ -163,11 +163,11 @@ pub fn get_typed_column_writer_mut<T: DataType>(
}
/// Typed column writer for a primitive column.
-pub struct ColumnWriterImpl<T: DataType> {
+pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
- page_writer: Box<dyn PageWriter>,
+ page_writer: Box<dyn PageWriter + 'a>,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
encoder: Box<dyn Encoder<T>>,
@@ -200,11 +200,11 @@ pub struct ColumnWriterImpl<T: DataType> {
_phantom: PhantomData<T>,
}
-impl<T: DataType> ColumnWriterImpl<T> {
+impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
pub fn new(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
- page_writer: Box<dyn PageWriter>,
+ page_writer: Box<dyn PageWriter + 'a>,
) -> Self {
let codec = props.compression(descr.path());
let compressor = create_codec(codec).unwrap();
@@ -1140,15 +1140,13 @@ mod tests {
page::PageReader,
reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
};
+ use crate::file::writer::TrackedWrite;
use crate::file::{
properties::WriterProperties, reader::SerializedPageReader,
writer::SerializedPageWriter,
};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
- use crate::util::{
- io::{FileSink, FileSource},
- test_common::random_numbers_range,
- };
+ use crate::util::{io::FileSource, test_common::random_numbers_range};
use super::*;
@@ -1825,9 +1823,9 @@ mod tests {
fn test_column_writer_add_data_pages_with_dict() {
// ARROW-5129: Test verifies that we add data page in case of dictionary encoding
// and no fallback occurred so far.
- let file = tempfile::tempfile().unwrap();
- let sink = FileSink::new(&file);
- let page_writer = Box::new(SerializedPageWriter::new(sink));
+ let mut file = tempfile::tempfile().unwrap();
+ let mut writer = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
let props = Arc::new(
WriterProperties::builder()
.set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
@@ -2120,9 +2118,9 @@ mod tests {
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) {
- let file = tempfile::tempfile().unwrap();
- let sink = FileSink::new(&file);
- let page_writer = Box::new(SerializedPageWriter::new(sink));
+ let mut file = tempfile::tempfile().unwrap();
+ let mut writer = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
let max_def_level = match def_levels {
Some(buf) => *buf.iter().max().unwrap_or(&0i16),
@@ -2257,12 +2255,12 @@ mod tests {
}
/// Returns column writer.
- fn get_test_column_writer<T: DataType>(
- page_writer: Box<dyn PageWriter>,
+ fn get_test_column_writer<'a, T: DataType>(
+ page_writer: Box<dyn PageWriter + 'a>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
- ) -> ColumnWriterImpl<T> {
+ ) -> ColumnWriterImpl<'a, T> {
let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
@@ -2343,7 +2341,7 @@ mod tests {
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
- ) -> ColumnWriterImpl<T> {
+ ) -> ColumnWriterImpl<'static, T> {
let descr = Arc::new(get_test_decimals_column_descr::<T>(
max_def_level,
max_rep_level,
@@ -2386,12 +2384,12 @@ mod tests {
}
/// Returns column writer for UINT32 Column provided as ConvertedType only
- fn get_test_unsigned_int_given_as_converted_column_writer<T: DataType>(
- page_writer: Box<dyn PageWriter>,
+ fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
+ page_writer: Box<dyn PageWriter + 'a>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
- ) -> ColumnWriterImpl<T> {
+ ) -> ColumnWriterImpl<'a, T> {
let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
max_def_level,
max_rep_level,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 28645a262..c01fb1530 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -1037,19 +1037,21 @@ pub trait DataType: 'static + Send {
where
Self: Sized;
- fn get_column_writer(column_writer: ColumnWriter) -> Option<ColumnWriterImpl<Self>>
+ fn get_column_writer(
+ column_writer: ColumnWriter<'_>,
+ ) -> Option<ColumnWriterImpl<'_, Self>>
where
Self: Sized;
- fn get_column_writer_ref(
- column_writer: &ColumnWriter,
- ) -> Option<&ColumnWriterImpl<Self>>
+ fn get_column_writer_ref<'a, 'b: 'a>(
+ column_writer: &'b ColumnWriter<'a>,
+ ) -> Option<&'b ColumnWriterImpl<'a, Self>>
where
Self: Sized;
- fn get_column_writer_mut(
- column_writer: &mut ColumnWriter,
- ) -> Option<&mut ColumnWriterImpl<Self>>
+ fn get_column_writer_mut<'a, 'b: 'a>(
+ column_writer: &'a mut ColumnWriter<'b>,
+ ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
where
Self: Sized;
}
@@ -1094,26 +1096,26 @@ macro_rules! make_type {
}
fn get_column_writer(
- column_writer: ColumnWriter,
- ) -> Option<ColumnWriterImpl<Self>> {
+ column_writer: ColumnWriter<'_>,
+ ) -> Option<ColumnWriterImpl<'_, Self>> {
match column_writer {
ColumnWriter::$writer_ident(w) => Some(w),
_ => None,
}
}
- fn get_column_writer_ref(
- column_writer: &ColumnWriter,
- ) -> Option<&ColumnWriterImpl<Self>> {
+ fn get_column_writer_ref<'a, 'b: 'a>(
+ column_writer: &'a ColumnWriter<'b>,
+ ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
match column_writer {
ColumnWriter::$writer_ident(w) => Some(w),
_ => None,
}
}
- fn get_column_writer_mut(
- column_writer: &mut ColumnWriter,
- ) -> Option<&mut ColumnWriterImpl<Self>> {
+ fn get_column_writer_mut<'a, 'b: 'a>(
+ column_writer: &'a mut ColumnWriter<'b>,
+ ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
match column_writer {
ColumnWriter::$writer_ident(w) => Some(w),
_ => None,
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index 78fb7ef11..d293dc773 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -32,7 +32,7 @@
//! use parquet::{
//! file::{
//! properties::WriterProperties,
-//! writer::{FileWriter, SerializedFileWriter},
+//! writer::SerializedFileWriter,
//! },
//! schema::parser::parse_message_type,
//! };
@@ -51,9 +51,9 @@
//! let mut row_group_writer = writer.next_row_group().unwrap();
//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
//! // ... write values to a column writer
-//! row_group_writer.close_column(col_writer).unwrap();
+//! col_writer.close().unwrap()
//! }
-//! writer.close_row_group(row_group_writer).unwrap();
+//! row_group_writer.close().unwrap();
//! writer.close().unwrap();
//!
//! let bytes = fs::read(&path).unwrap();
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c6d0d1066..646550dcb 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,176 +18,176 @@
//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.
-use std::{
- io::{Seek, SeekFrom, Write},
- sync::Arc,
-};
+use std::{io::Write, sync::Arc};
use byteorder::{ByteOrder, LittleEndian};
use parquet_format as parquet;
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use crate::basic::PageType;
+use crate::column::writer::{get_typed_column_writer_mut, ColumnWriterImpl};
use crate::column::{
page::{CompressedPage, Page, PageWriteSpec, PageWriter},
writer::{get_column_writer, ColumnWriter},
};
+use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, properties::WriterPropertiesPtr,
statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC,
};
use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr};
-use crate::util::io::{FileSink, Position};
-
-// Exposed publically so client code can implement [`ParquetWriter`]
-pub use crate::util::io::TryClone;
+use crate::util::io::TryClone;
-// Exposed publically for convenience of writing Parquet to a buffer of bytes
-pub use crate::util::cursor::InMemoryWriteableCursor;
+/// A wrapper around a [`Write`] that keeps track of the number
+/// of bytes that have been written
+pub struct TrackedWrite<W> {
+ inner: W,
+ bytes_written: usize,
+}
-// ----------------------------------------------------------------------
-// APIs for file & row group writers
+impl<W: Write> TrackedWrite<W> {
+ /// Create a new [`TrackedWrite`] from a [`Write`]
+ pub fn new(inner: W) -> Self {
+ Self {
+ inner,
+ bytes_written: 0,
+ }
+ }
-/// Parquet file writer API.
-/// Provides methods to write row groups sequentially.
-///
-/// The main workflow should be as following:
-/// - Create file writer, this will open a new file and potentially write some metadata.
-/// - Request a new row group writer by calling `next_row_group`.
-/// - Once finished writing row group, close row group writer by passing it into
-/// `close_row_group` method - this will finalise row group metadata and update metrics.
-/// - Write subsequent row groups, if necessary.
-/// - After all row groups have been written, close the file writer using `close` method.
-pub trait FileWriter {
- /// Creates new row group from this file writer.
- /// In case of IO error or Thrift error, returns `Err`.
- ///
- /// There is no limit on a number of row groups in a file; however, row groups have
- /// to be written sequentially. Every time the next row group is requested, the
- /// previous row group must be finalised and closed using `close_row_group` method.
- fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>>;
+ /// Returns the number of bytes written to this instance
+ pub fn bytes_written(&self) -> usize {
+ self.bytes_written
+ }
+}
- /// Finalises and closes row group that was created using `next_row_group` method.
- /// After calling this method, the next row group is available for writes.
- fn close_row_group(
- &mut self,
- row_group_writer: Box<dyn RowGroupWriter>,
- ) -> Result<()>;
+impl<W: Write> Write for TrackedWrite<W> {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let bytes = self.inner.write(buf)?;
+ self.bytes_written += bytes;
+ Ok(bytes)
+ }
- /// Closes and finalises file writer, returning the file metadata.
- ///
- /// All row groups must be appended before this method is called.
- /// No writes are allowed after this point.
- ///
- /// Can be called multiple times. It is up to implementation to either result in
- /// no-op, or return an `Err` for subsequent calls.
- fn close(&mut self) -> Result<parquet::FileMetaData>;
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.inner.flush()
+ }
}
-/// Parquet row group writer API.
-/// Provides methods to access column writers in an iterator-like fashion, order is
-/// guaranteed to match the order of schema leaves (column descriptors).
+/// Callback invoked on closing a column chunk, arguments are:
///
-/// All columns should be written sequentially; the main workflow is:
-/// - Request the next column using `next_column` method - this will return `None` if no
-/// more columns are available to write.
-/// - Once done writing a column, close column writer with `close_column` method - this
-/// will finalise column chunk metadata and update row group metrics.
-/// - Once all columns have been written, close row group writer with `close` method -
-/// it will return row group metadata and is no-op on already closed row group.
-pub trait RowGroupWriter {
- /// Returns the next column writer, if available; otherwise returns `None`.
- /// In case of any IO error or Thrift error, or if row group writer has already been
- /// closed returns `Err`.
- ///
- /// To request the next column writer, the previous one must be finalised and closed
- /// using `close_column`.
- fn next_column(&mut self) -> Result<Option<ColumnWriter>>;
+/// - the number of bytes written
+/// - the number of rows written
+/// - the column chunk metadata
+///
+pub type OnCloseColumnChunk<'a> =
+ Box<dyn FnOnce(u64, u64, ColumnChunkMetaData) -> Result<()> + 'a>;
- /// Closes column writer that was created using `next_column` method.
- /// This should be called before requesting the next column writer.
- fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>;
+/// Callback invoked on closing a row group, arguments are:
+///
+/// - the row group metadata
+pub type OnCloseRowGroup<'a> = Box<dyn FnOnce(RowGroupMetaDataPtr) -> Result<()> + 'a>;
- /// Closes this row group writer and returns row group metadata.
- /// After calling this method row group writer must not be used.
- ///
- /// It is recommended to call this method before requesting another row group, but it
- /// will be closed automatically before returning a new row group.
- ///
- /// Can be called multiple times. In subsequent calls will result in no-op and return
- /// already created row group metadata.
- fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
-}
+#[deprecated = "use std::io::Write"]
+pub trait ParquetWriter: Write + std::io::Seek + TryClone {}
+#[allow(deprecated)]
+impl<T: Write + std::io::Seek + TryClone> ParquetWriter for T {}
// ----------------------------------------------------------------------
// Serialized impl for file & row group writers
-pub trait ParquetWriter: Write + Seek + TryClone {}
-impl<T: Write + Seek + TryClone> ParquetWriter for T {}
-
-/// A serialized implementation for Parquet [`FileWriter`].
-/// See documentation on file writer for more information.
-pub struct SerializedFileWriter<W: ParquetWriter> {
- buf: W,
+/// Parquet file writer API.
+/// Provides methods to write row groups sequentially.
+///
+/// The main workflow should be as following:
+/// - Create file writer, this will open a new file and potentially write some metadata.
+/// - Request a new row group writer by calling `next_row_group`.
+/// - Once finished writing row group, close row group writer by calling `close`
+/// - Write subsequent row groups, if necessary.
+/// - After all row groups have been written, close the file writer using `close` method.
+pub struct SerializedFileWriter<W: Write> {
+ buf: TrackedWrite<W>,
schema: TypePtr,
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
- total_num_rows: i64,
row_groups: Vec<RowGroupMetaDataPtr>,
- previous_writer_closed: bool,
- is_closed: bool,
+ row_group_index: usize,
}
-impl<W: ParquetWriter> SerializedFileWriter<W> {
+impl<W: Write> SerializedFileWriter<W> {
/// Creates new file writer.
- pub fn new(
- mut buf: W,
- schema: TypePtr,
- properties: WriterPropertiesPtr,
- ) -> Result<Self> {
+ pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
+ let mut buf = TrackedWrite::new(buf);
Self::start_file(&mut buf)?;
Ok(Self {
buf,
schema: schema.clone(),
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
- total_num_rows: 0,
- row_groups: Vec::new(),
- previous_writer_closed: true,
- is_closed: false,
+ row_groups: vec![],
+ row_group_index: 0,
})
}
- /// Writes magic bytes at the beginning of the file.
- fn start_file(buf: &mut W) -> Result<()> {
- buf.write_all(&PARQUET_MAGIC)?;
- Ok(())
+ /// Creates new row group from this file writer.
+ /// In case of IO error or Thrift error, returns `Err`.
+ ///
+ /// There is no limit on a number of row groups in a file; however, row groups have
+ /// to be written sequentially. Every time the next row group is requested, the
+ /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
+ pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
+ self.assert_previous_writer_closed()?;
+ self.row_group_index += 1;
+
+ let row_groups = &mut self.row_groups;
+ let on_close = |metadata| {
+ row_groups.push(metadata);
+ Ok(())
+ };
+
+ let row_group_writer = SerializedRowGroupWriter::new(
+ self.descr.clone(),
+ self.props.clone(),
+ &mut self.buf,
+ Some(Box::new(on_close)),
+ );
+ Ok(row_group_writer)
+ }
+
+ /// Closes and finalises file writer, returning the file metadata.
+ ///
+ /// All row groups must be appended before this method is called.
+ /// No writes are allowed after this point.
+ ///
+ /// Can be called multiple times. It is up to implementation to either result in
+ /// no-op, or return an `Err` for subsequent calls.
+ pub fn close(mut self) -> Result<parquet::FileMetaData> {
+ self.assert_previous_writer_closed()?;
+ let metadata = self.write_metadata()?;
+ Ok(metadata)
}
- /// Finalises active row group writer, otherwise no-op.
- fn finalise_row_group_writer(
- &mut self,
- mut row_group_writer: Box<dyn RowGroupWriter>,
- ) -> Result<()> {
- let row_group_metadata = row_group_writer.close()?;
- self.total_num_rows += row_group_metadata.num_rows();
- self.row_groups.push(row_group_metadata);
+ /// Writes magic bytes at the beginning of the file.
+ fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> {
+ buf.write_all(&PARQUET_MAGIC)?;
Ok(())
}
/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
+ let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
+
+ let row_groups = self
+ .row_groups
+ .as_slice()
+ .iter()
+ .map(|v| v.to_thrift())
+ .collect();
+
let file_metadata = parquet::FileMetaData {
+ num_rows,
+ row_groups,
version: self.props.writer_version().as_num(),
schema: types::to_thrift(self.schema.as_ref())?,
- num_rows: self.total_num_rows as i64,
- row_groups: self
- .row_groups
- .as_slice()
- .iter()
- .map(|v| v.to_thrift())
- .collect(),
key_value_metadata: self.props.key_value_metadata().cloned(),
created_by: Some(self.props.created_by().to_owned()),
column_orders: None,
@@ -196,13 +196,13 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
};
// Write file metadata
- let start_pos = self.buf.seek(SeekFrom::Current(0))?;
+ let start_pos = self.buf.bytes_written();
{
let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
file_metadata.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
}
- let end_pos = self.buf.seek(SeekFrom::Current(0))?;
+ let end_pos = self.buf.bytes_written();
// Write footer
let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE];
@@ -213,18 +213,9 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
Ok(file_metadata)
}
- #[inline]
- fn assert_closed(&self) -> Result<()> {
- if self.is_closed {
- Err(general_err!("File writer is closed"))
- } else {
- Ok(())
- }
- }
-
#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
- if !self.previous_writer_closed {
+ if self.row_group_index != self.row_groups.len() {
Err(general_err!("Previous row group writer was not closed"))
} else {
Ok(())
@@ -232,157 +223,107 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
}
}
-impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
- #[inline]
- fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>> {
- self.assert_closed()?;
- self.assert_previous_writer_closed()?;
- let row_group_writer = SerializedRowGroupWriter::new(
- self.descr.clone(),
- self.props.clone(),
- &self.buf,
- );
- self.previous_writer_closed = false;
- Ok(Box::new(row_group_writer))
- }
-
- #[inline]
- fn close_row_group(
- &mut self,
- row_group_writer: Box<dyn RowGroupWriter>,
- ) -> Result<()> {
- self.assert_closed()?;
- let res = self.finalise_row_group_writer(row_group_writer);
- self.previous_writer_closed = res.is_ok();
- res
- }
-
- #[inline]
- fn close(&mut self) -> Result<parquet::FileMetaData> {
- self.assert_closed()?;
- self.assert_previous_writer_closed()?;
- let metadata = self.write_metadata()?;
- self.is_closed = true;
- Ok(metadata)
- }
-}
-
-/// A serialized implementation for Parquet [`RowGroupWriter`].
-/// Coordinates writing of a row group with column writers.
-/// See documentation on row group writer for more information.
-pub struct SerializedRowGroupWriter<W: ParquetWriter> {
+/// Parquet row group writer API.
+/// Provides methods to access column writers in an iterator-like fashion, order is
+/// guaranteed to match the order of schema leaves (column descriptors).
+///
+/// All columns should be written sequentially; the main workflow is:
+/// - Request the next column using `next_column` method - this will return `None` if no
+/// more columns are available to write.
+/// - Once done writing a column, close column writer with `close`
+/// - Once all columns have been written, close row group writer with `close` method -
+/// it will return row group metadata and is no-op on already closed row group.
+pub struct SerializedRowGroupWriter<'a, W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
- buf: W,
+ buf: &'a mut TrackedWrite<W>,
total_rows_written: Option<u64>,
total_bytes_written: u64,
column_index: usize,
- previous_writer_closed: bool,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
+ on_close: Option<OnCloseRowGroup<'a>>,
}
-impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
+impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+ /// Creates a new `SerializedRowGroupWriter` with:
+ ///
+ /// - `schema_descr` - the schema to write
+ /// - `properties` - writer properties
+ /// - `buf` - the buffer to write data to
+ /// - `on_close` - an optional callback that will invoked on [`Self::close`]
pub fn new(
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
- buf: &W,
+ buf: &'a mut TrackedWrite<W>,
+ on_close: Option<OnCloseRowGroup<'a>>,
) -> Self {
let num_columns = schema_descr.num_columns();
Self {
+ buf,
+ on_close,
+ total_rows_written: None,
descr: schema_descr,
props: properties,
- buf: buf.try_clone().unwrap(),
- total_rows_written: None,
- total_bytes_written: 0,
column_index: 0,
- previous_writer_closed: true,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
+ total_bytes_written: 0,
}
}
- /// Checks and finalises current column writer.
- fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {
- let (bytes_written, rows_written, metadata) = match writer {
- ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
- ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
- ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
- ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
- ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?,
- };
-
- // Update row group writer metrics
- self.total_bytes_written += bytes_written;
- self.column_chunks.push(metadata);
- if let Some(rows) = self.total_rows_written {
- if rows != rows_written {
- return Err(general_err!(
- "Incorrect number of rows, expected {} != {} rows",
- rows,
- rows_written
- ));
- }
- } else {
- self.total_rows_written = Some(rows_written);
- }
-
- Ok(())
- }
-
- #[inline]
- fn assert_closed(&self) -> Result<()> {
- if self.row_group_metadata.is_some() {
- Err(general_err!("Row group writer is closed"))
- } else {
- Ok(())
- }
- }
-
- #[inline]
- fn assert_previous_writer_closed(&self) -> Result<()> {
- if !self.previous_writer_closed {
- Err(general_err!("Previous column writer was not closed"))
- } else {
- Ok(())
- }
- }
-}
-
-impl<W: 'static + ParquetWriter> RowGroupWriter for SerializedRowGroupWriter<W> {
- #[inline]
- fn next_column(&mut self) -> Result<Option<ColumnWriter>> {
- self.assert_closed()?;
+ /// Returns the next column writer, if available; otherwise returns `None`.
+ /// In case of any IO error or Thrift error, or if row group writer has already been
+ /// closed returns `Err`.
+ pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
self.assert_previous_writer_closed()?;
if self.column_index >= self.descr.num_columns() {
return Ok(None);
}
- let sink = FileSink::new(&self.buf);
- let page_writer = Box::new(SerializedPageWriter::new(sink));
+ let page_writer = Box::new(SerializedPageWriter::new(self.buf));
let column_writer = get_column_writer(
self.descr.column(self.column_index),
self.props.clone(),
page_writer,
);
self.column_index += 1;
- self.previous_writer_closed = false;
- Ok(Some(column_writer))
- }
+ let total_bytes_written = &mut self.total_bytes_written;
+ let total_rows_written = &mut self.total_rows_written;
+ let column_chunks = &mut self.column_chunks;
+
+ let on_close = |bytes_written, rows_written, metadata| {
+ // Update row group writer metrics
+ *total_bytes_written += bytes_written;
+ column_chunks.push(metadata);
+ if let Some(rows) = *total_rows_written {
+ if rows != rows_written {
+ return Err(general_err!(
+ "Incorrect number of rows, expected {} != {} rows",
+ rows,
+ rows_written
+ ));
+ }
+ } else {
+ *total_rows_written = Some(rows_written);
+ }
- #[inline]
- fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()> {
- let res = self.finalise_column_writer(column_writer);
- self.previous_writer_closed = res.is_ok();
- res
+ Ok(())
+ };
+
+ Ok(Some(SerializedColumnWriter::new(
+ column_writer,
+ Some(Box::new(on_close)),
+ )))
}
- #[inline]
- fn close(&mut self) -> Result<RowGroupMetaDataPtr> {
+ /// Closes this row group writer and returns row group metadata.
+ /// After calling this method row group writer must not be used.
+ ///
+ /// Can be called multiple times. In subsequent calls will result in no-op and return
+ /// already created row group metadata.
+ pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
if self.row_group_metadata.is_none() {
self.assert_previous_writer_closed()?;
@@ -393,25 +334,86 @@ impl<W: 'static + ParquetWriter> RowGroupWriter for SerializedRowGroupWriter<W>
.set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
.build()?;
- self.row_group_metadata = Some(Arc::new(row_group_metadata));
+ let metadata = Arc::new(row_group_metadata);
+ self.row_group_metadata = Some(metadata.clone());
+
+ if let Some(on_close) = self.on_close.take() {
+ on_close(metadata)?
+ }
}
let metadata = self.row_group_metadata.as_ref().unwrap().clone();
Ok(metadata)
}
+
+ #[inline]
+ fn assert_previous_writer_closed(&self) -> Result<()> {
+ if self.column_index != self.column_chunks.len() {
+ Err(general_err!("Previous column writer was not closed"))
+ } else {
+ Ok(())
+ }
+ }
+}
+
+/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
+pub struct SerializedColumnWriter<'a> {
+ inner: ColumnWriter<'a>,
+ on_close: Option<OnCloseColumnChunk<'a>>,
+}
+
+impl<'a> SerializedColumnWriter<'a> {
+ /// Create a new [`SerializedColumnWriter`] from a `[`ColumnWriter`] and an
+ /// optional callback to be invoked on [`Self::close`]
+ pub fn new(
+ inner: ColumnWriter<'a>,
+ on_close: Option<OnCloseColumnChunk<'a>>,
+ ) -> Self {
+ Self { inner, on_close }
+ }
+
+ /// Returns a reference to an untyped [`ColumnWriter`]
+ pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
+ &mut self.inner
+ }
+
+ /// Returns a reference to a typed [`ColumnWriterImpl`]
+ pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
+ get_typed_column_writer_mut(&mut self.inner)
+ }
+
+ /// Close this [`SerializedColumnWriter]
+ pub fn close(mut self) -> Result<()> {
+ let (bytes_written, rows_written, metadata) = match self.inner {
+ ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
+ ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
+ ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
+ ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
+ ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
+ ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
+ ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
+ ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?,
+ };
+
+ if let Some(on_close) = self.on_close.take() {
+ on_close(bytes_written, rows_written, metadata)?
+ }
+
+ Ok(())
+ }
}
/// A serialized implementation for Parquet [`PageWriter`].
/// Writes and serializes pages and metadata into output stream.
///
/// `SerializedPageWriter` should not be used after calling `close()`.
-pub struct SerializedPageWriter<T: Write + Position> {
- sink: T,
+pub struct SerializedPageWriter<'a, W> {
+ sink: &'a mut TrackedWrite<W>,
}
-impl<T: Write + Position> SerializedPageWriter<T> {
+impl<'a, W: Write> SerializedPageWriter<'a, W> {
/// Creates new page writer.
- pub fn new(sink: T) -> Self {
+ pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
Self { sink }
}
@@ -419,13 +421,13 @@ impl<T: Write + Position> SerializedPageWriter<T> {
/// Returns number of bytes that have been written into the sink.
#[inline]
fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
- let start_pos = self.sink.pos();
+ let start_pos = self.sink.bytes_written();
{
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
header.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
}
- Ok((self.sink.pos() - start_pos) as usize)
+ Ok(self.sink.bytes_written() - start_pos)
}
/// Serializes column chunk into Thrift.
@@ -439,7 +441,7 @@ impl<T: Write + Position> SerializedPageWriter<T> {
}
}
-impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
+impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
@@ -506,7 +508,7 @@ impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
}
}
- let start_pos = self.sink.pos();
+ let start_pos = self.sink.bytes_written() as u64;
let header_size = self.serialize_page_header(page_header)?;
self.sink.write_all(page.data())?;
@@ -516,7 +518,7 @@ impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
spec.uncompressed_size = uncompressed_size + header_size;
spec.compressed_size = compressed_size + header_size;
spec.offset = start_pos;
- spec.bytes_written = self.sink.pos() - start_pos;
+ spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
// Number of values is incremented for data pages only
if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 {
spec.num_values = num_values;
@@ -544,6 +546,7 @@ mod tests {
use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
use crate::column::page::PageReader;
use crate::compression::{create_codec, Codec};
+ use crate::data_type::Int32Type;
use crate::file::{
properties::{WriterProperties, WriterVersion},
reader::{FileReader, SerializedFileReader, SerializedPageReader},
@@ -552,48 +555,6 @@ mod tests {
use crate::record::RowAccessor;
use crate::util::memory::ByteBufferPtr;
- #[test]
- fn test_file_writer_error_after_close() {
- let file = tempfile::tempfile().unwrap();
- let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap());
- let props = Arc::new(WriterProperties::builder().build());
- let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
- writer.close().unwrap();
- {
- let res = writer.next_row_group();
- assert!(res.is_err());
- if let Err(err) = res {
- assert_eq!(format!("{}", err), "Parquet error: File writer is closed");
- }
- }
- {
- let res = writer.close();
- assert!(res.is_err());
- if let Err(err) = res {
- assert_eq!(format!("{}", err), "Parquet error: File writer is closed");
- }
- }
- }
-
- #[test]
- fn test_row_group_writer_error_after_close() {
- let file = tempfile::tempfile().unwrap();
- let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap());
- let props = Arc::new(WriterProperties::builder().build());
- let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
- let mut row_group_writer = writer.next_row_group().unwrap();
- row_group_writer.close().unwrap();
-
- let res = row_group_writer.next_column();
- assert!(res.is_err());
- if let Err(err) = res {
- assert_eq!(
- format!("{}", err),
- "Parquet error: Row group writer is closed"
- );
- }
- }
-
#[test]
fn test_row_group_writer_error_not_all_columns_written() {
let file = tempfile::tempfile().unwrap();
@@ -609,7 +570,7 @@ mod tests {
);
let props = Arc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
- let mut row_group_writer = writer.next_row_group().unwrap();
+ let row_group_writer = writer.next_row_group().unwrap();
let res = row_group_writer.close();
assert!(res.is_err());
if let Err(err) = res {
@@ -647,24 +608,23 @@ mod tests {
let mut row_group_writer = writer.next_row_group().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
- if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
- typed.write_batch(&[1, 2, 3], None, None).unwrap();
- }
- row_group_writer.close_column(col_writer).unwrap();
+ col_writer
+ .typed::<Int32Type>()
+ .write_batch(&[1, 2, 3], None, None)
+ .unwrap();
+ col_writer.close().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
- if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
- typed.write_batch(&[1, 2], None, None).unwrap();
- }
+ col_writer
+ .typed::<Int32Type>()
+ .write_batch(&[1, 2], None, None)
+ .unwrap();
- let res = row_group_writer.close_column(col_writer);
- assert!(res.is_err());
- if let Err(err) = res {
- assert_eq!(
- format!("{}", err),
- "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
- );
- }
+ let err = col_writer.close().unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
+ );
}
#[test]
@@ -682,7 +642,7 @@ mod tests {
.unwrap(),
);
let props = Arc::new(WriterProperties::builder().build());
- let mut writer =
+ let writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
writer.close().unwrap();
@@ -712,7 +672,7 @@ mod tests {
)]))
.build(),
);
- let mut writer =
+ let writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
writer.close().unwrap();
@@ -758,7 +718,7 @@ mod tests {
.set_writer_version(WriterVersion::PARQUET_2_0)
.build(),
);
- let mut writer =
+ let writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
writer.close().unwrap();
@@ -971,8 +931,8 @@ mod tests {
let mut buffer: Vec<u8> = vec![];
let mut result_pages: Vec<Page> = vec![];
{
- let cursor = Cursor::new(&mut buffer);
- let mut page_writer = SerializedPageWriter::new(cursor);
+ let mut writer = TrackedWrite::new(&mut buffer);
+ let mut page_writer = SerializedPageWriter::new(&mut writer);
for page in compressed_pages {
page_writer.write_page(page).unwrap();
@@ -1041,22 +1001,15 @@ mod tests {
for subset in &data {
let mut row_group_writer = file_writer.next_row_group().unwrap();
- let col_writer = row_group_writer.next_column().unwrap();
- if let Some(mut writer) = col_writer {
- match writer {
- ColumnWriter::Int32ColumnWriter(ref mut typed) => {
- rows +=
- typed.write_batch(&subset[..], None, None).unwrap() as i64;
- }
- _ => {
- unimplemented!();
- }
- }
- row_group_writer.close_column(writer).unwrap();
+ if let Some(mut writer) = row_group_writer.next_column().unwrap() {
+ rows += writer
+ .typed::<Int32Type>()
+ .write_batch(&subset[..], None, None)
+ .unwrap() as i64;
+ writer.close().unwrap();
}
- file_writer.close_row_group(row_group_writer).unwrap();
+ row_group_writer.close().unwrap();
}
-
file_writer.close().unwrap();
let reader = assert_send(SerializedFileReader::new(file).unwrap());
@@ -1101,7 +1054,7 @@ mod tests {
}
fn test_bytes_roundtrip(data: Vec<Vec<i32>>) {
- let cursor = InMemoryWriteableCursor::default();
+ let mut cursor = Cursor::new(vec![]);
let schema = Arc::new(
types::Type::group_type_builder("schema")
@@ -1119,30 +1072,24 @@ mod tests {
{
let props = Arc::new(WriterProperties::builder().build());
let mut writer =
- SerializedFileWriter::new(cursor.clone(), schema, props).unwrap();
+ SerializedFileWriter::new(&mut cursor, schema, props).unwrap();
for subset in &data {
let mut row_group_writer = writer.next_row_group().unwrap();
- let col_writer = row_group_writer.next_column().unwrap();
- if let Some(mut writer) = col_writer {
- match writer {
- ColumnWriter::Int32ColumnWriter(ref mut typed) => {
- rows += typed.write_batch(&subset[..], None, None).unwrap()
- as i64;
- }
- _ => {
- unimplemented!();
- }
- }
- row_group_writer.close_column(writer).unwrap();
+ if let Some(mut writer) = row_group_writer.next_column().unwrap() {
+ rows += writer
+ .typed::<Int32Type>()
+ .write_batch(&subset[..], None, None)
+ .unwrap() as i64;
+
+ writer.close().unwrap();
}
- writer.close_row_group(row_group_writer).unwrap();
+ row_group_writer.close().unwrap();
}
-
writer.close().unwrap();
}
- let buffer = cursor.into_inner().unwrap();
+ let buffer = cursor.into_inner();
let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
let reader = SerializedFileReader::new(reading_cursor).unwrap();
diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs
index 6668eec51..fe803a7ff 100644
--- a/parquet/src/record/record_writer.rs
+++ b/parquet/src/record/record_writer.rs
@@ -18,12 +18,12 @@
use crate::schema::types::TypePtr;
use super::super::errors::ParquetError;
-use super::super::file::writer::RowGroupWriter;
+use super::super::file::writer::SerializedRowGroupWriter;
pub trait RecordWriter<T> {
- fn write_to_row_group(
+ fn write_to_row_group<W: std::io::Write>(
&self,
- row_group_writer: &mut Box<dyn RowGroupWriter>,
+ row_group_writer: &mut SerializedRowGroupWriter<W>,
) -> Result<(), ParquetError>;
/// Generated schema
diff --git a/parquet/src/util/cursor.rs b/parquet/src/util/cursor.rs
index c03fc66f2..ff7067fcb 100644
--- a/parquet/src/util/cursor.rs
+++ b/parquet/src/util/cursor.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::util::io::TryClone;
use std::io::{self, Cursor, Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Mutex};
use std::{cmp, fmt};
-use crate::file::writer::TryClone;
-
/// This is object to use if your file is already in memory.
/// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices".
/// To achieve this, it uses Arc instead of shared references. Indeed reference fields are painful
@@ -134,11 +133,13 @@ impl Seek for SliceableCursor {
}
/// Use this type to write Parquet to memory rather than a file.
+#[deprecated = "use Vec<u8> instead"]
#[derive(Debug, Default, Clone)]
pub struct InMemoryWriteableCursor {
buffer: Arc<Mutex<Cursor<Vec<u8>>>>,
}
+#[allow(deprecated)]
impl InMemoryWriteableCursor {
/// Consume this instance and return the underlying buffer as long as there are no other
/// references to this instance.
@@ -168,6 +169,7 @@ impl InMemoryWriteableCursor {
}
}
+#[allow(deprecated)]
impl TryClone for InMemoryWriteableCursor {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(Self {
@@ -176,6 +178,7 @@ impl TryClone for InMemoryWriteableCursor {
}
}
+#[allow(deprecated)]
impl Write for InMemoryWriteableCursor {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut inner = self.buffer.lock().unwrap();
@@ -188,6 +191,7 @@ impl Write for InMemoryWriteableCursor {
}
}
+#[allow(deprecated)]
impl Seek for InMemoryWriteableCursor {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let mut inner = self.buffer.lock().unwrap();
diff --git a/parquet/src/util/io.rs b/parquet/src/util/io.rs
index c10e6d616..a7b5e7307 100644
--- a/parquet/src/util/io.rs
+++ b/parquet/src/util/io.rs
@@ -17,7 +17,9 @@
use std::{cell::RefCell, cmp, fmt, io::*};
-use crate::file::{reader::Length, writer::ParquetWriter};
+use crate::file::reader::Length;
+#[allow(deprecated)]
+use crate::file::writer::ParquetWriter;
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
@@ -156,6 +158,8 @@ impl<R: ParquetReader> Length for FileSource<R> {
/// Struct that represents `File` output stream with position tracking.
/// Used as a sink in file writer.
+#[deprecated = "use TrackedWrite instead"]
+#[allow(deprecated)]
pub struct FileSink<W: ParquetWriter> {
buf: BufWriter<W>,
// This is not necessarily position in the underlying file,
@@ -163,6 +167,7 @@ pub struct FileSink<W: ParquetWriter> {
pos: u64,
}
+#[allow(deprecated)]
impl<W: ParquetWriter> FileSink<W> {
/// Creates new file sink.
/// Position is set to whatever position file has.
@@ -176,6 +181,7 @@ impl<W: ParquetWriter> FileSink<W> {
}
}
+#[allow(deprecated)]
impl<W: ParquetWriter> Write for FileSink<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
let num_bytes = self.buf.write(buf)?;
@@ -188,6 +194,7 @@ impl<W: ParquetWriter> Write for FileSink<W> {
}
}
+#[allow(deprecated)]
impl<W: ParquetWriter> Position for FileSink<W> {
fn pos(&self) -> u64 {
self.pos
@@ -271,6 +278,7 @@ mod tests {
}
#[test]
+ #[allow(deprecated)]
fn test_io_write_with_pos() {
let mut file = tempfile::tempfile().unwrap();
file.write_all(&[b'a', b'b', b'c']).unwrap();
diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs
index b9d757e71..dc2eccfbf 100644
--- a/parquet/tests/boolean_writer.rs
+++ b/parquet/tests/boolean_writer.rs
@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use parquet::column::writer::ColumnWriter;
+use parquet::data_type::BoolType;
use parquet::file::properties::WriterProperties;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
-use parquet::file::writer::FileWriter;
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::parser::parse_message_type;
use std::fs;
@@ -53,25 +52,15 @@ fn it_writes_data_without_hanging() {
while let Some(mut col_writer) =
row_group_writer.next_column().expect("next column")
{
- match col_writer {
- ColumnWriter::BoolColumnWriter(ref mut typed_writer) => {
- typed_writer
- .write_batch(&my_bool_values, None, None)
- .expect("writing bool column");
- }
- _ => {
- panic!("only test boolean values");
- }
- }
- row_group_writer
- .close_column(col_writer)
- .expect("close column");
+ col_writer
+ .typed::<BoolType>()
+ .write_batch(&my_bool_values, None, None)
+ .expect("writing bool column");
+
+ col_writer.close().expect("close column");
}
let rg_md = row_group_writer.close().expect("close row group");
println!("total rows written: {}", rg_md.num_rows());
- writer
- .close_row_group(row_group_writer)
- .expect("close row groups");
}
writer.close().expect("close writer");
diff --git a/parquet/tests/custom_writer.rs b/parquet/tests/custom_writer.rs
deleted file mode 100644
index 0a57e79d9..000000000
--- a/parquet/tests/custom_writer.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::fs::File;
-use std::{
- fs,
- io::{prelude::*, SeekFrom},
- sync::Arc,
-};
-
-use parquet::file::writer::TryClone;
-use parquet::{
- basic::Repetition, basic::Type, file::properties::WriterProperties,
- file::writer::SerializedFileWriter, schema::types,
-};
-use std::env;
-
-// Test creating some sort of custom writer to ensure the
-// appropriate traits are exposed
-struct CustomWriter {
- file: File,
-}
-
-impl Write for CustomWriter {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- self.file.write(buf)
- }
- fn flush(&mut self) -> std::io::Result<()> {
- self.file.flush()
- }
-}
-
-impl Seek for CustomWriter {
- fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
- self.file.seek(pos)
- }
-}
-
-impl TryClone for CustomWriter {
- fn try_clone(&self) -> std::io::Result<Self> {
- use std::io::{Error, ErrorKind};
- Err(Error::new(ErrorKind::Other, "Clone not supported"))
- }
-}
-
-#[test]
-fn test_custom_writer() {
- let schema = Arc::new(
- types::Type::group_type_builder("schema")
- .with_fields(&mut vec![Arc::new(
- types::Type::primitive_type_builder("col1", Type::INT32)
- .with_repetition(Repetition::REQUIRED)
- .build()
- .unwrap(),
- )])
- .build()
- .unwrap(),
- );
- let props = Arc::new(WriterProperties::builder().build());
-
- let file = get_temp_file("test_custom_file_writer");
- let test_file = file.try_clone().unwrap();
-
- let writer = CustomWriter { file };
-
- // test is that this file can be created
- let file_writer = SerializedFileWriter::new(writer, schema, props).unwrap();
- std::mem::drop(file_writer);
-
- // ensure the file now exists and has non zero size
- let metadata = test_file.metadata().unwrap();
- assert!(metadata.len() > 0);
-}
-
-/// Returns file handle for a temp file in 'target' directory with a provided content
-fn get_temp_file(file_name: &str) -> fs::File {
- // build tmp path to a file in "target/debug/testdata"
- let mut path_buf = env::current_dir().unwrap();
- path_buf.push("target");
- path_buf.push("debug");
- path_buf.push("testdata");
- fs::create_dir_all(&path_buf).unwrap();
- path_buf.push(file_name);
-
- File::create(path_buf).unwrap()
-}
diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs
index d5d1e6818..fc7af20ca 100644
--- a/parquet_derive/src/lib.rs
+++ b/parquet_derive/src/lib.rs
@@ -98,9 +98,9 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
(quote! {
impl #generics RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
- fn write_to_row_group(
+ fn write_to_row_group<W: std::io::Write>(
&self,
- row_group_writer: &mut Box<dyn parquet::file::writer::RowGroupWriter>
+ row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter<'_, W>
) -> Result<(), parquet::errors::ParquetError> {
let mut row_group_writer = row_group_writer;
let records = &self; // Used by all the writer snippets to be more clear
@@ -110,7 +110,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
let mut some_column_writer = row_group_writer.next_column().unwrap();
if let Some(mut column_writer) = some_column_writer {
#writer_snippets
- row_group_writer.close_column(column_writer)?;
+ column_writer.close()?;
} else {
return Err(parquet::errors::ParquetError::General("Failed to get next column".into()))
}
diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs
index be2e6efaa..835ac793e 100644
--- a/parquet_derive/src/parquet_field.rs
+++ b/parquet_derive/src/parquet_field.rs
@@ -147,7 +147,7 @@ impl Field {
// this expression just switches between non-nullable and nullable write statements
let write_batch_expr = if definition_levels.is_some() {
quote! {
- if let #column_writer(ref mut typed) = column_writer {
+ if let #column_writer(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
} else {
panic!("Schema and struct disagree on type for {}", stringify!{#ident})
@@ -155,7 +155,7 @@ impl Field {
}
} else {
quote! {
- if let #column_writer(ref mut typed) = column_writer {
+ if let #column_writer(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None)?;
} else {
panic!("Schema and struct disagree on type for {}", stringify!{#ident})
@@ -666,7 +666,7 @@ mod test {
{
let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
- if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer {
+ if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , None , None ) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
@@ -703,7 +703,7 @@ mod test {
}
}).collect();
- if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer {
+ if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
} else {
panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
@@ -727,7 +727,7 @@ mod test {
}
}).collect();
- if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer {
+ if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
} else {
panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
@@ -750,7 +750,7 @@ mod test {
}
}).collect();
- if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer {
+ if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
} else {
panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
@@ -975,7 +975,7 @@ mod test {
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
- if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
+ if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
@@ -995,7 +995,7 @@ mod test {
}
}).collect();
- if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
+ if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
@@ -1018,7 +1018,7 @@ mod test {
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
- if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
+ if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
@@ -1038,7 +1038,7 @@ mod test {
}
}).collect();
- if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
+ if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
@@ -1061,7 +1061,7 @@ mod test {
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect();
- if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
+ if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
@@ -1081,7 +1081,7 @@ mod test {
}
}).collect();
- if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
+ if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs
index 2b7c060bb..189802b9a 100644
--- a/parquet_derive_test/src/lib.rs
+++ b/parquet_derive_test/src/lib.rs
@@ -54,10 +54,7 @@ mod tests {
use super::*;
use parquet::{
- file::{
- properties::WriterProperties,
- writer::{FileWriter, SerializedFileWriter},
- },
+ file::{properties::WriterProperties, writer::SerializedFileWriter},
schema::parser::parse_message_type,
};
use std::{env, fs, io::Write, sync::Arc};
@@ -133,7 +130,7 @@ mod tests {
let mut row_group = writer.next_row_group().unwrap();
drs.as_slice().write_to_row_group(&mut row_group).unwrap();
- writer.close_row_group(row_group).unwrap();
+ row_group.close().unwrap();
writer.close().unwrap();
}