You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/23 10:52:41 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #1719: Change parquet `ArrowFileWriter` to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163)

alamb commented on code in PR #1719:
URL: https://github.com/apache/arrow-rs/pull/1719#discussion_r879302492


##########
parquet/src/file/writer.rs:
##########
@@ -92,102 +111,90 @@ pub trait FileWriter {
 /// All columns should be written sequentially; the main workflow is:
 /// - Request the next column using `next_column` method - this will return `None` if no
 /// more columns are available to write.
-/// - Once done writing a column, close column writer with `close_column` method - this
-/// will finalise column chunk metadata and update row group metrics.
+/// - Once done writing a column, close column writer with `close`

Review Comment:
   What is the reason to remove the rationale ('this will finalize column chunk metadata ...')?



##########
parquet/src/util/cursor.rs:
##########
@@ -133,68 +131,6 @@ impl Seek for SliceableCursor {
     }
 }
 
-/// Use this type to write Parquet to memory rather than a file.
-#[derive(Debug, Default, Clone)]

Review Comment:
   In terms of easing the transition of this change, what would you think about marking this struct "deprecated" and keeping the code backwards compatible for a few releases (e.g. `impl Wite for InMemoryWriteableCursor`)? 
   
   That might make the API change easier to manage and give users some time to remove it.



##########
parquet/src/file/mod.rs:
##########
@@ -48,12 +48,14 @@
 //! let props = Arc::new(WriterProperties::builder().build());
 //! let file = fs::File::create(&path).unwrap();
 //! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-//! let mut row_group_writer = writer.next_row_group().unwrap();
-//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
-//!     // ... write values to a column writer
-//!     row_group_writer.close_column(col_writer).unwrap();
+//! {

Review Comment:
   Also, it seems to me like we could change this example to actually run (not sure why it is marked `norun` -- it just needs to pick a tempfile as target rather than `/path/to/sample.parquet`)



##########
parquet/src/file/writer.rs:
##########
@@ -270,69 +273,45 @@ impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
 /// A serialized implementation for Parquet [`RowGroupWriter`].
 /// Coordinates writing of a row group with column writers.
 /// See documentation on row group writer for more information.
-pub struct SerializedRowGroupWriter<W: ParquetWriter> {
+pub struct SerializedRowGroupWriter<'a, W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
-    buf: W,
+    buf: &'a mut TrackedWrite<W>,
     total_rows_written: Option<u64>,
     total_bytes_written: u64,
     column_index: usize,
-    previous_writer_closed: bool,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    on_close: Option<OnCloseRowGroup<'a>>,
 }
 
-impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
+impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+    /// Creates a new `SerializedRowGroupWriter` with:
+    ///
+    /// - `schema_descr` - the schema to write
+    /// - `properties` - writer properties
+    /// - `buf` - the buffer to write data to
+    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
     pub fn new(
         schema_descr: SchemaDescPtr,
         properties: WriterPropertiesPtr,
-        buf: &W,
+        buf: &'a mut TrackedWrite<W>,
+        on_close: Option<OnCloseRowGroup<'a>>,
     ) -> Self {
         let num_columns = schema_descr.num_columns();
         Self {
+            buf,
+            on_close,
+            total_rows_written: None,
             descr: schema_descr,
             props: properties,
-            buf: buf.try_clone().unwrap(),
-            total_rows_written: None,
-            total_bytes_written: 0,
             column_index: 0,
-            previous_writer_closed: true,
             row_group_metadata: None,
             column_chunks: Vec::with_capacity(num_columns),
+            total_bytes_written: 0,
         }
     }
 
-    /// Checks and finalises current column writer.
-    fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {

Review Comment:
   This code appears to have been inlined into `next_column` below



##########
parquet/src/util/io.rs:
##########
@@ -153,47 +153,6 @@ impl<R: ParquetReader> Length for FileSource<R> {
         self.end - self.start
     }
 }
-
-/// Struct that represents `File` output stream with position tracking.
-/// Used as a sink in file writer.

Review Comment:
   Likewise, I recommend considering leaving this structure in (and marking it as "deprecated") for a release or two to give people a chance to update their code rather over time.



##########
parquet/benches/arrow_writer.rs:
##########
@@ -278,8 +276,8 @@ fn _create_nested_bench_batch(
 #[inline]
 fn write_batch(batch: &RecordBatch) -> Result<()> {
     // Write batch to an in-memory writer
-    let cursor = InMemoryWriteableCursor::default();
-    let mut writer = ArrowWriter::try_new(cursor, batch.schema(), None)?;
+    let buffer = vec![];

Review Comment:
   here is a nice example of the new API in action: use something that does `std::io::Write`



##########
parquet/src/file/writer.rs:
##########
@@ -214,7 +221,7 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
     }
 
     #[inline]
-    fn assert_closed(&self) -> Result<()> {
+    fn assert_not_closed(&self) -> Result<()> {

Review Comment:
   👍 



##########
parquet/src/file/writer.rs:
##########
@@ -196,13 +203,13 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
         };
 
         // Write file metadata
-        let start_pos = self.buf.seek(SeekFrom::Current(0))?;
+        let start_pos = self.buf.bytes_written();

Review Comment:
   ❤️ 



##########
parquet/src/file/writer.rs:
##########
@@ -270,69 +273,45 @@ impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
 /// A serialized implementation for Parquet [`RowGroupWriter`].
 /// Coordinates writing of a row group with column writers.
 /// See documentation on row group writer for more information.
-pub struct SerializedRowGroupWriter<W: ParquetWriter> {
+pub struct SerializedRowGroupWriter<'a, W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
-    buf: W,
+    buf: &'a mut TrackedWrite<W>,
     total_rows_written: Option<u64>,
     total_bytes_written: u64,
     column_index: usize,
-    previous_writer_closed: bool,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    on_close: Option<OnCloseRowGroup<'a>>,
 }
 
-impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
+impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+    /// Creates a new `SerializedRowGroupWriter` with:
+    ///
+    /// - `schema_descr` - the schema to write
+    /// - `properties` - writer properties
+    /// - `buf` - the buffer to write data to
+    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
     pub fn new(
         schema_descr: SchemaDescPtr,
         properties: WriterPropertiesPtr,
-        buf: &W,
+        buf: &'a mut TrackedWrite<W>,
+        on_close: Option<OnCloseRowGroup<'a>>,

Review Comment:
   why is this `Option` -- shouldn't it always be required?



##########
parquet/src/file/mod.rs:
##########
@@ -48,12 +48,14 @@
 //! let props = Arc::new(WriterProperties::builder().build());
 //! let file = fs::File::create(&path).unwrap();
 //! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-//! let mut row_group_writer = writer.next_row_group().unwrap();
-//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
-//!     // ... write values to a column writer
-//!     row_group_writer.close_column(col_writer).unwrap();
+//! {

Review Comment:
   Which trait indirection are you referring to?



##########
parquet/src/file/writer.rs:
##########
@@ -92,102 +111,90 @@ pub trait FileWriter {
 /// All columns should be written sequentially; the main workflow is:
 /// - Request the next column using `next_column` method - this will return `None` if no
 /// more columns are available to write.
-/// - Once done writing a column, close column writer with `close_column` method - this
-/// will finalise column chunk metadata and update row group metrics.
+/// - Once done writing a column, close column writer with `close`
 /// - Once all columns have been written, close row group writer with `close` method -
 /// it will return row group metadata and is no-op on already closed row group.
 pub trait RowGroupWriter {
     /// Returns the next column writer, if available; otherwise returns `None`.
     /// In case of any IO error or Thrift error, or if row group writer has already been
     /// closed returns `Err`.
-    ///
-    /// To request the next column writer, the previous one must be finalised and closed
-    /// using `close_column`.
-    fn next_column(&mut self) -> Result<Option<ColumnWriter>>;
-
-    /// Closes column writer that was created using `next_column` method.
-    /// This should be called before requesting the next column writer.
-    fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>;
+    fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>>;
 
     /// Closes this row group writer and returns row group metadata.
     /// After calling this method row group writer must not be used.
     ///
-    /// It is recommended to call this method before requesting another row group, but it
-    /// will be closed automatically before returning a new row group.
-    ///
     /// Can be called multiple times. In subsequent calls will result in no-op and return
     /// already created row group metadata.
     fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
 }
 
+/// Callback invoked on closing a column chunk, arguments are:

Review Comment:
   Also importantly, by keeping a mutable reference it lets the compiler prevent concurrent writes to the same writer, as reported in https://github.com/apache/arrow-rs/issues/1717



##########
parquet/src/file/writer.rs:
##########
@@ -65,15 +91,8 @@ pub trait FileWriter {
     ///
     /// There is no limit on a number of row groups in a file; however, row groups have
     /// to be written sequentially. Every time the next row group is requested, the
-    /// previous row group must be finalised and closed using `close_row_group` method.
-    fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>>;
-
-    /// Finalises and closes row group that was created using `next_row_group` method.
-    /// After calling this method, the next row group is available for writes.
-    fn close_row_group(
-        &mut self,
-        row_group_writer: Box<dyn RowGroupWriter>,
-    ) -> Result<()>;
+    /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
+    fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter + '_>>;

Review Comment:
   It might help here to leave a link back to the docs in `parquet/src/file/mod.rs` to bring readers to an example



##########
parquet_derive_test/src/lib.rs:
##########
@@ -131,9 +131,13 @@ mod tests {
         let mut writer =
             SerializedFileWriter::new(file, generated_schema, props).unwrap();
 
-        let mut row_group = writer.next_row_group().unwrap();
-        drs.as_slice().write_to_row_group(&mut row_group).unwrap();
-        writer.close_row_group(row_group).unwrap();
+        {
+            let mut row_group = writer.next_row_group().unwrap();
+            drs.as_slice()
+                .write_to_row_group(row_group.as_mut())
+                .unwrap();
+            row_group.close().unwrap();

Review Comment:
   I do like this pattern better (close the row group writer rather than passing it back to the parquet file writer to close)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org