You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/25 08:33:18 UTC

[arrow-rs] branch master updated: Change parquet writers to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163) (#1719)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 722fcfcf2 Change parquet writers to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163) (#1719)
722fcfcf2 is described below

commit 722fcfcf2f55672c2bae626e3f652a3a792dff13
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 25 09:33:12 2022 +0100

    Change parquet writers to use standard `std:io::Write` rather custom `ParquetWriter` trait (#1717) (#1163) (#1719)
    
    * Rustify parquet writer (#1717) (#1163)
    
    * Fix parquet_derive
    
    * Fix benches
    
    * Fix parquet_derive tests
    
    * Use raw vec instead of Cursor
    
    * Review feedback
    
    * Fix unnecessary unwrap
---
 parquet/benches/arrow_writer.rs              |   8 +-
 parquet/src/arrow/array_reader/list_array.rs |   2 +-
 parquet/src/arrow/arrow_reader.rs            |  39 +-
 parquet/src/arrow/arrow_writer.rs            |  44 +-
 parquet/src/arrow/schema.rs                  |   4 +-
 parquet/src/column/mod.rs                    |  22 +-
 parquet/src/column/writer.rs                 |  76 ++--
 parquet/src/data_type.rs                     |  32 +-
 parquet/src/file/mod.rs                      |   6 +-
 parquet/src/file/writer.rs                   | 633 ++++++++++++---------------
 parquet/src/record/record_writer.rs          |   6 +-
 parquet/src/util/cursor.rs                   |   8 +-
 parquet/src/util/io.rs                       |  10 +-
 parquet/tests/boolean_writer.rs              |  25 +-
 parquet/tests/custom_writer.rs               | 100 -----
 parquet_derive/src/lib.rs                    |   6 +-
 parquet_derive/src/parquet_field.rs          |  24 +-
 parquet_derive_test/src/lib.rs               |   7 +-
 18 files changed, 448 insertions(+), 604 deletions(-)

diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index f1154eb9e..25ff1ca90 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -26,9 +26,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::*;
 use arrow::{record_batch::RecordBatch, util::data_gen::*};
-use parquet::{
-    arrow::ArrowWriter, errors::Result, file::writer::InMemoryWriteableCursor,
-};
+use parquet::{arrow::ArrowWriter, errors::Result};
 
 fn create_primitive_bench_batch(
     size: usize,
@@ -278,8 +276,8 @@ fn _create_nested_bench_batch(
 #[inline]
 fn write_batch(batch: &RecordBatch) -> Result<()> {
     // Write batch to an in-memory writer
-    let cursor = InMemoryWriteableCursor::default();
-    let mut writer = ArrowWriter::try_new(cursor, batch.schema(), None)?;
+    let buffer = vec![];
+    let mut writer = ArrowWriter::try_new(buffer, batch.schema(), None)?;
 
     writer.write(batch)?;
     writer.close()?;
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index 2d199f69e..808f815e6 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -564,7 +564,7 @@ mod tests {
             .set_max_row_group_size(200)
             .build();
 
-        let mut writer = ArrowWriter::try_new(
+        let writer = ArrowWriter::try_new(
             file.try_clone().unwrap(),
             Arc::new(arrow_schema),
             Some(props),
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index e3a1d1233..34a14f372 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -263,7 +263,6 @@ mod tests {
     use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
     use crate::arrow::{ArrowWriter, ProjectionMask};
     use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
-    use crate::column::writer::get_typed_column_writer_mut;
     use crate::data_type::{
         BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
         FixedLenByteArrayType, Int32Type, Int64Type,
@@ -271,7 +270,7 @@ mod tests {
     use crate::errors::Result;
     use crate::file::properties::{WriterProperties, WriterVersion};
     use crate::file::reader::{FileReader, SerializedFileReader};
-    use crate::file::writer::{FileWriter, SerializedFileWriter};
+    use crate::file::writer::SerializedFileWriter;
     use crate::schema::parser::parse_message_type;
     use crate::schema::types::{Type, TypePtr};
     use crate::util::cursor::SliceableCursor;
@@ -936,21 +935,24 @@ mod tests {
         for (idx, v) in values.iter().enumerate() {
             let def_levels = def_levels.map(|d| d[idx].as_slice());
             let mut row_group_writer = writer.next_row_group()?;
-            let mut column_writer = row_group_writer
-                .next_column()?
-                .expect("Column writer is none!");
+            {
+                let mut column_writer = row_group_writer
+                    .next_column()?
+                    .expect("Column writer is none!");
 
-            get_typed_column_writer_mut::<T>(&mut column_writer)
-                .write_batch(v, def_levels, None)?;
+                column_writer
+                    .typed::<T>()
+                    .write_batch(v, def_levels, None)?;
 
-            row_group_writer.close_column(column_writer)?;
-            writer.close_row_group(row_group_writer)?
+                column_writer.close()?;
+            }
+            row_group_writer.close()?;
         }
 
         writer.close()
     }
 
-    fn get_test_reader(file_name: &str) -> Arc<dyn FileReader> {
+    fn get_test_reader(file_name: &str) -> Arc<SerializedFileReader<File>> {
         let file = get_test_file(file_name);
 
         let reader =
@@ -1094,15 +1096,18 @@ mod tests {
             )
             .unwrap();
 
-            let mut row_group_writer = writer.next_row_group().unwrap();
-            let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
+            {
+                let mut row_group_writer = writer.next_row_group().unwrap();
+                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
 
-            get_typed_column_writer_mut::<Int32Type>(&mut column_writer)
-                .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
-                .unwrap();
+                column_writer
+                    .typed::<Int32Type>()
+                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
+                    .unwrap();
 
-            row_group_writer.close_column(column_writer).unwrap();
-            writer.close_row_group(row_group_writer).unwrap();
+                column_writer.close().unwrap();
+                row_group_writer.close().unwrap();
+            }
 
             writer.close().unwrap();
         }
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 1918c9675..e1fd93a9b 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -18,6 +18,7 @@
 //! Contains writer which writes arrow data into parquet data.
 
 use std::collections::VecDeque;
+use std::io::Write;
 use std::sync::Arc;
 
 use arrow::array as arrow_array;
@@ -35,10 +36,8 @@ use super::schema::{
 use crate::column::writer::ColumnWriter;
 use crate::errors::{ParquetError, Result};
 use crate::file::properties::WriterProperties;
-use crate::{
-    data_type::*,
-    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
-};
+use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
+use crate::{data_type::*, file::writer::SerializedFileWriter};
 
 /// Arrow writer
 ///
@@ -46,7 +45,7 @@ use crate::{
 /// to produce row groups with `max_row_group_size` rows. Any remaining rows will be
 /// flushed on close, leading the final row group in the output file to potentially
 /// contain fewer than `max_row_group_size` rows
-pub struct ArrowWriter<W: ParquetWriter> {
+pub struct ArrowWriter<W: Write> {
     /// Underlying Parquet writer
     writer: SerializedFileWriter<W>,
 
@@ -65,7 +64,7 @@ pub struct ArrowWriter<W: ParquetWriter> {
     max_row_group_size: usize,
 }
 
-impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+impl<W: Write> ArrowWriter<W> {
     /// Try to create a new Arrow writer
     ///
     /// The writer will fail if:
@@ -185,17 +184,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
                 })
                 .collect();
 
-            write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?;
+            write_leaves(&mut row_group_writer, &arrays, &mut levels)?;
         }
 
-        self.writer.close_row_group(row_group_writer)?;
+        row_group_writer.close()?;
         self.buffered_rows -= num_rows;
 
         Ok(())
     }
 
     /// Close and finalize the underlying Parquet writer
-    pub fn close(&mut self) -> Result<parquet_format::FileMetaData> {
+    pub fn close(mut self) -> Result<parquet_format::FileMetaData> {
         self.flush()?;
         self.writer.close()
     }
@@ -203,15 +202,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
 
 /// Convenience method to get the next ColumnWriter from the RowGroupWriter
 #[inline]
-fn get_col_writer(row_group_writer: &mut dyn RowGroupWriter) -> Result<ColumnWriter> {
+fn get_col_writer<'a, W: Write>(
+    row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>,
+) -> Result<SerializedColumnWriter<'a>> {
     let col_writer = row_group_writer
         .next_column()?
         .expect("Unable to get column writer");
     Ok(col_writer)
 }
 
-fn write_leaves(
-    row_group_writer: &mut dyn RowGroupWriter,
+fn write_leaves<W: Write>(
+    row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
     arrays: &[ArrayRef],
     levels: &mut [Vec<LevelInfo>],
 ) -> Result<()> {
@@ -250,12 +251,12 @@ fn write_leaves(
             let mut col_writer = get_col_writer(row_group_writer)?;
             for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
                 write_leaf(
-                    &mut col_writer,
+                    col_writer.untyped(),
                     array,
                     levels.pop().expect("Levels exhausted"),
                 )?;
             }
-            row_group_writer.close_column(col_writer)?;
+            col_writer.close()?;
             Ok(())
         }
         ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
@@ -313,12 +314,12 @@ fn write_leaves(
                 // cast dictionary to a primitive
                 let array = arrow::compute::cast(array, value_type)?;
                 write_leaf(
-                    &mut col_writer,
+                    col_writer.untyped(),
                     &array,
                     levels.pop().expect("Levels exhausted"),
                 )?;
             }
-            row_group_writer.close_column(col_writer)?;
+            col_writer.close()?;
             Ok(())
         }
         ArrowDataType::Float16 => Err(ParquetError::ArrowError(
@@ -336,8 +337,8 @@ fn write_leaves(
 }
 
 fn write_leaf(
-    writer: &mut ColumnWriter,
-    column: &arrow_array::ArrayRef,
+    writer: &mut ColumnWriter<'_>,
+    column: &ArrayRef,
     levels: LevelInfo,
 ) -> Result<i64> {
     let indices = levels.filter_array_indices();
@@ -705,7 +706,6 @@ mod tests {
     use crate::file::{
         reader::{FileReader, SerializedFileReader},
         statistics::Statistics,
-        writer::InMemoryWriteableCursor,
     };
 
     #[test]
@@ -744,16 +744,14 @@ mod tests {
         let expected_batch =
             RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
 
-        let cursor = InMemoryWriteableCursor::default();
+        let mut buffer = vec![];
 
         {
-            let mut writer = ArrowWriter::try_new(cursor.clone(), schema, None).unwrap();
+            let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
             writer.write(&expected_batch).unwrap();
             writer.close().unwrap();
         }
 
-        let buffer = cursor.into_inner().unwrap();
-
         let cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
         let reader = SerializedFileReader::new(cursor).unwrap();
         let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index 820aa7e7a..5416e4078 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -1591,7 +1591,7 @@ mod tests {
 
         // write to an empty parquet file so that schema is serialized
         let file = tempfile::tempfile().unwrap();
-        let mut writer = ArrowWriter::try_new(
+        let writer = ArrowWriter::try_new(
             file.try_clone().unwrap(),
             Arc::new(schema.clone()),
             None,
@@ -1660,7 +1660,7 @@ mod tests {
 
         // write to an empty parquet file so that schema is serialized
         let file = tempfile::tempfile().unwrap();
-        let mut writer = ArrowWriter::try_new(
+        let writer = ArrowWriter::try_new(
             file.try_clone().unwrap(),
             Arc::new(schema.clone()),
             None,
diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs
index 7ed7bfc25..93a4f00d2 100644
--- a/parquet/src/column/mod.rs
+++ b/parquet/src/column/mod.rs
@@ -40,10 +40,11 @@
 //!
 //! use parquet::{
 //!     column::{reader::ColumnReader, writer::ColumnWriter},
+//!     data_type::Int32Type,
 //!     file::{
 //!         properties::WriterProperties,
 //!         reader::{FileReader, SerializedFileReader},
-//!         writer::{FileWriter, SerializedFileWriter},
+//!         writer::SerializedFileWriter,
 //!     },
 //!     schema::parser::parse_message_type,
 //! };
@@ -65,20 +66,17 @@
 //! let props = Arc::new(WriterProperties::builder().build());
 //! let file = fs::File::create(path).unwrap();
 //! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
+//!
 //! let mut row_group_writer = writer.next_row_group().unwrap();
 //! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
-//!     match col_writer {
-//!         // You can also use `get_typed_column_writer` method to extract typed writer.
-//!         ColumnWriter::Int32ColumnWriter(ref mut typed_writer) => {
-//!             typed_writer
-//!                 .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1]))
-//!                 .unwrap();
-//!         }
-//!         _ => {}
-//!     }
-//!     row_group_writer.close_column(col_writer).unwrap();
+//!     col_writer
+//!         .typed::<Int32Type>()
+//!         .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1]))
+//!         .unwrap();
+//!     col_writer.close().unwrap();
 //! }
-//! writer.close_row_group(row_group_writer).unwrap();
+//! row_group_writer.close().unwrap();
+//!
 //! writer.close().unwrap();
 //!
 //! // Reading data using column reader API.
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index a7d0ba8fc..d80cafe0e 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -39,15 +39,15 @@ use crate::util::bit_util::FromBytes;
 use crate::util::memory::ByteBufferPtr;
 
 /// Column writer for a Parquet type.
-pub enum ColumnWriter {
-    BoolColumnWriter(ColumnWriterImpl<BoolType>),
-    Int32ColumnWriter(ColumnWriterImpl<Int32Type>),
-    Int64ColumnWriter(ColumnWriterImpl<Int64Type>),
-    Int96ColumnWriter(ColumnWriterImpl<Int96Type>),
-    FloatColumnWriter(ColumnWriterImpl<FloatType>),
-    DoubleColumnWriter(ColumnWriterImpl<DoubleType>),
-    ByteArrayColumnWriter(ColumnWriterImpl<ByteArrayType>),
-    FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>),
+pub enum ColumnWriter<'a> {
+    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
+    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
+    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
+    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
+    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
+    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
+    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
+    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
 }
 
 pub enum Level {
@@ -76,11 +76,11 @@ macro_rules! gen_stats_section {
 }
 
 /// Gets a specific column writer corresponding to column descriptor `descr`.
-pub fn get_column_writer(
+pub fn get_column_writer<'a>(
     descr: ColumnDescPtr,
     props: WriterPropertiesPtr,
-    page_writer: Box<dyn PageWriter>,
-) -> ColumnWriter {
+    page_writer: Box<dyn PageWriter + 'a>,
+) -> ColumnWriter<'a> {
     match descr.physical_type() {
         Type::BOOLEAN => ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(
             descr,
@@ -139,9 +139,9 @@ pub fn get_typed_column_writer<T: DataType>(
 }
 
 /// Similar to `get_typed_column_writer` but returns a reference.
-pub fn get_typed_column_writer_ref<T: DataType>(
-    col_writer: &ColumnWriter,
-) -> &ColumnWriterImpl<T> {
+pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
+    col_writer: &'b ColumnWriter<'a>,
+) -> &'b ColumnWriterImpl<'a, T> {
     T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
         panic!(
             "Failed to convert column writer into a typed column writer for `{}` type",
@@ -151,9 +151,9 @@ pub fn get_typed_column_writer_ref<T: DataType>(
 }
 
 /// Similar to `get_typed_column_writer` but returns a reference.
-pub fn get_typed_column_writer_mut<T: DataType>(
-    col_writer: &mut ColumnWriter,
-) -> &mut ColumnWriterImpl<T> {
+pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
+    col_writer: &'a mut ColumnWriter<'b>,
+) -> &'a mut ColumnWriterImpl<'b, T> {
     T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
         panic!(
             "Failed to convert column writer into a typed column writer for `{}` type",
@@ -163,11 +163,11 @@ pub fn get_typed_column_writer_mut<T: DataType>(
 }
 
 /// Typed column writer for a primitive column.
-pub struct ColumnWriterImpl<T: DataType> {
+pub struct ColumnWriterImpl<'a, T: DataType> {
     // Column writer properties
     descr: ColumnDescPtr,
     props: WriterPropertiesPtr,
-    page_writer: Box<dyn PageWriter>,
+    page_writer: Box<dyn PageWriter + 'a>,
     has_dictionary: bool,
     dict_encoder: Option<DictEncoder<T>>,
     encoder: Box<dyn Encoder<T>>,
@@ -200,11 +200,11 @@ pub struct ColumnWriterImpl<T: DataType> {
     _phantom: PhantomData<T>,
 }
 
-impl<T: DataType> ColumnWriterImpl<T> {
+impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     pub fn new(
         descr: ColumnDescPtr,
         props: WriterPropertiesPtr,
-        page_writer: Box<dyn PageWriter>,
+        page_writer: Box<dyn PageWriter + 'a>,
     ) -> Self {
         let codec = props.compression(descr.path());
         let compressor = create_codec(codec).unwrap();
@@ -1140,15 +1140,13 @@ mod tests {
         page::PageReader,
         reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
     };
+    use crate::file::writer::TrackedWrite;
     use crate::file::{
         properties::WriterProperties, reader::SerializedPageReader,
         writer::SerializedPageWriter,
     };
     use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
-    use crate::util::{
-        io::{FileSink, FileSource},
-        test_common::random_numbers_range,
-    };
+    use crate::util::{io::FileSource, test_common::random_numbers_range};
 
     use super::*;
 
@@ -1825,9 +1823,9 @@ mod tests {
     fn test_column_writer_add_data_pages_with_dict() {
         // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
         // and no fallback occurred so far.
-        let file = tempfile::tempfile().unwrap();
-        let sink = FileSink::new(&file);
-        let page_writer = Box::new(SerializedPageWriter::new(sink));
+        let mut file = tempfile::tempfile().unwrap();
+        let mut writer = TrackedWrite::new(&mut file);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
         let props = Arc::new(
             WriterProperties::builder()
                 .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
@@ -2120,9 +2118,9 @@ mod tests {
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
     ) {
-        let file = tempfile::tempfile().unwrap();
-        let sink = FileSink::new(&file);
-        let page_writer = Box::new(SerializedPageWriter::new(sink));
+        let mut file = tempfile::tempfile().unwrap();
+        let mut writer = TrackedWrite::new(&mut file);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
 
         let max_def_level = match def_levels {
             Some(buf) => *buf.iter().max().unwrap_or(&0i16),
@@ -2257,12 +2255,12 @@ mod tests {
     }
 
     /// Returns column writer.
-    fn get_test_column_writer<T: DataType>(
-        page_writer: Box<dyn PageWriter>,
+    fn get_test_column_writer<'a, T: DataType>(
+        page_writer: Box<dyn PageWriter + 'a>,
         max_def_level: i16,
         max_rep_level: i16,
         props: WriterPropertiesPtr,
-    ) -> ColumnWriterImpl<T> {
+    ) -> ColumnWriterImpl<'a, T> {
         let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
         let column_writer = get_column_writer(descr, props, page_writer);
         get_typed_column_writer::<T>(column_writer)
@@ -2343,7 +2341,7 @@ mod tests {
         max_def_level: i16,
         max_rep_level: i16,
         props: WriterPropertiesPtr,
-    ) -> ColumnWriterImpl<T> {
+    ) -> ColumnWriterImpl<'static, T> {
         let descr = Arc::new(get_test_decimals_column_descr::<T>(
             max_def_level,
             max_rep_level,
@@ -2386,12 +2384,12 @@ mod tests {
     }
 
     /// Returns column writer for UINT32 Column provided as ConvertedType only
-    fn get_test_unsigned_int_given_as_converted_column_writer<T: DataType>(
-        page_writer: Box<dyn PageWriter>,
+    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
+        page_writer: Box<dyn PageWriter + 'a>,
         max_def_level: i16,
         max_rep_level: i16,
         props: WriterPropertiesPtr,
-    ) -> ColumnWriterImpl<T> {
+    ) -> ColumnWriterImpl<'a, T> {
         let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
             max_def_level,
             max_rep_level,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 28645a262..c01fb1530 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -1037,19 +1037,21 @@ pub trait DataType: 'static + Send {
     where
         Self: Sized;
 
-    fn get_column_writer(column_writer: ColumnWriter) -> Option<ColumnWriterImpl<Self>>
+    fn get_column_writer(
+        column_writer: ColumnWriter<'_>,
+    ) -> Option<ColumnWriterImpl<'_, Self>>
     where
         Self: Sized;
 
-    fn get_column_writer_ref(
-        column_writer: &ColumnWriter,
-    ) -> Option<&ColumnWriterImpl<Self>>
+    fn get_column_writer_ref<'a, 'b: 'a>(
+        column_writer: &'b ColumnWriter<'a>,
+    ) -> Option<&'b ColumnWriterImpl<'a, Self>>
     where
         Self: Sized;
 
-    fn get_column_writer_mut(
-        column_writer: &mut ColumnWriter,
-    ) -> Option<&mut ColumnWriterImpl<Self>>
+    fn get_column_writer_mut<'a, 'b: 'a>(
+        column_writer: &'a mut ColumnWriter<'b>,
+    ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
     where
         Self: Sized;
 }
@@ -1094,26 +1096,26 @@ macro_rules! make_type {
             }
 
             fn get_column_writer(
-                column_writer: ColumnWriter,
-            ) -> Option<ColumnWriterImpl<Self>> {
+                column_writer: ColumnWriter<'_>,
+            ) -> Option<ColumnWriterImpl<'_, Self>> {
                 match column_writer {
                     ColumnWriter::$writer_ident(w) => Some(w),
                     _ => None,
                 }
             }
 
-            fn get_column_writer_ref(
-                column_writer: &ColumnWriter,
-            ) -> Option<&ColumnWriterImpl<Self>> {
+            fn get_column_writer_ref<'a, 'b: 'a>(
+                column_writer: &'a ColumnWriter<'b>,
+            ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
                 match column_writer {
                     ColumnWriter::$writer_ident(w) => Some(w),
                     _ => None,
                 }
             }
 
-            fn get_column_writer_mut(
-                column_writer: &mut ColumnWriter,
-            ) -> Option<&mut ColumnWriterImpl<Self>> {
+            fn get_column_writer_mut<'a, 'b: 'a>(
+                column_writer: &'a mut ColumnWriter<'b>,
+            ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
                 match column_writer {
                     ColumnWriter::$writer_ident(w) => Some(w),
                     _ => None,
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index 78fb7ef11..d293dc773 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -32,7 +32,7 @@
 //! use parquet::{
 //!     file::{
 //!         properties::WriterProperties,
-//!         writer::{FileWriter, SerializedFileWriter},
+//!         writer::SerializedFileWriter,
 //!     },
 //!     schema::parser::parse_message_type,
 //! };
@@ -51,9 +51,9 @@
 //! let mut row_group_writer = writer.next_row_group().unwrap();
 //! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
 //!     // ... write values to a column writer
-//!     row_group_writer.close_column(col_writer).unwrap();
+//!     col_writer.close().unwrap()
 //! }
-//! writer.close_row_group(row_group_writer).unwrap();
+//! row_group_writer.close().unwrap();
 //! writer.close().unwrap();
 //!
 //! let bytes = fs::read(&path).unwrap();
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c6d0d1066..646550dcb 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,176 +18,176 @@
 //! Contains file writer API, and provides methods to write row groups and columns by
 //! using row group writers and column writers respectively.
 
-use std::{
-    io::{Seek, SeekFrom, Write},
-    sync::Arc,
-};
+use std::{io::Write, sync::Arc};
 
 use byteorder::{ByteOrder, LittleEndian};
 use parquet_format as parquet;
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
 
 use crate::basic::PageType;
+use crate::column::writer::{get_typed_column_writer_mut, ColumnWriterImpl};
 use crate::column::{
     page::{CompressedPage, Page, PageWriteSpec, PageWriter},
     writer::{get_column_writer, ColumnWriter},
 };
+use crate::data_type::DataType;
 use crate::errors::{ParquetError, Result};
 use crate::file::{
     metadata::*, properties::WriterPropertiesPtr,
     statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC,
 };
 use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr};
-use crate::util::io::{FileSink, Position};
-
-// Exposed publically so client code can implement [`ParquetWriter`]
-pub use crate::util::io::TryClone;
+use crate::util::io::TryClone;
 
-// Exposed publically for convenience of writing Parquet to a buffer of bytes
-pub use crate::util::cursor::InMemoryWriteableCursor;
+/// A wrapper around a [`Write`] that keeps track of the number
+/// of bytes that have been written
+pub struct TrackedWrite<W> {
+    inner: W,
+    bytes_written: usize,
+}
 
-// ----------------------------------------------------------------------
-// APIs for file & row group writers
+impl<W: Write> TrackedWrite<W> {
+    /// Create a new [`TrackedWrite`] from a [`Write`]
+    pub fn new(inner: W) -> Self {
+        Self {
+            inner,
+            bytes_written: 0,
+        }
+    }
 
-/// Parquet file writer API.
-/// Provides methods to write row groups sequentially.
-///
-/// The main workflow should be as following:
-/// - Create file writer, this will open a new file and potentially write some metadata.
-/// - Request a new row group writer by calling `next_row_group`.
-/// - Once finished writing row group, close row group writer by passing it into
-/// `close_row_group` method - this will finalise row group metadata and update metrics.
-/// - Write subsequent row groups, if necessary.
-/// - After all row groups have been written, close the file writer using `close` method.
-pub trait FileWriter {
-    /// Creates new row group from this file writer.
-    /// In case of IO error or Thrift error, returns `Err`.
-    ///
-    /// There is no limit on a number of row groups in a file; however, row groups have
-    /// to be written sequentially. Every time the next row group is requested, the
-    /// previous row group must be finalised and closed using `close_row_group` method.
-    fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>>;
+    /// Returns the number of bytes written to this instance
+    pub fn bytes_written(&self) -> usize {
+        self.bytes_written
+    }
+}
 
-    /// Finalises and closes row group that was created using `next_row_group` method.
-    /// After calling this method, the next row group is available for writes.
-    fn close_row_group(
-        &mut self,
-        row_group_writer: Box<dyn RowGroupWriter>,
-    ) -> Result<()>;
+impl<W: Write> Write for TrackedWrite<W> {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        let bytes = self.inner.write(buf)?;
+        self.bytes_written += bytes;
+        Ok(bytes)
+    }
 
-    /// Closes and finalises file writer, returning the file metadata.
-    ///
-    /// All row groups must be appended before this method is called.
-    /// No writes are allowed after this point.
-    ///
-    /// Can be called multiple times. It is up to implementation to either result in
-    /// no-op, or return an `Err` for subsequent calls.
-    fn close(&mut self) -> Result<parquet::FileMetaData>;
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.inner.flush()
+    }
 }
 
-/// Parquet row group writer API.
-/// Provides methods to access column writers in an iterator-like fashion, order is
-/// guaranteed to match the order of schema leaves (column descriptors).
+/// Callback invoked on closing a column chunk, arguments are:
 ///
-/// All columns should be written sequentially; the main workflow is:
-/// - Request the next column using `next_column` method - this will return `None` if no
-/// more columns are available to write.
-/// - Once done writing a column, close column writer with `close_column` method - this
-/// will finalise column chunk metadata and update row group metrics.
-/// - Once all columns have been written, close row group writer with `close` method -
-/// it will return row group metadata and is no-op on already closed row group.
-pub trait RowGroupWriter {
-    /// Returns the next column writer, if available; otherwise returns `None`.
-    /// In case of any IO error or Thrift error, or if row group writer has already been
-    /// closed returns `Err`.
-    ///
-    /// To request the next column writer, the previous one must be finalised and closed
-    /// using `close_column`.
-    fn next_column(&mut self) -> Result<Option<ColumnWriter>>;
+/// - the number of bytes written
+/// - the number of rows written
+/// - the column chunk metadata
+///
+pub type OnCloseColumnChunk<'a> =
+    Box<dyn FnOnce(u64, u64, ColumnChunkMetaData) -> Result<()> + 'a>;
 
-    /// Closes column writer that was created using `next_column` method.
-    /// This should be called before requesting the next column writer.
-    fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>;
+/// Callback invoked on closing a row group, arguments are:
+///
+/// - the row group metadata
+pub type OnCloseRowGroup<'a> = Box<dyn FnOnce(RowGroupMetaDataPtr) -> Result<()> + 'a>;
 
-    /// Closes this row group writer and returns row group metadata.
-    /// After calling this method row group writer must not be used.
-    ///
-    /// It is recommended to call this method before requesting another row group, but it
-    /// will be closed automatically before returning a new row group.
-    ///
-    /// Can be called multiple times. In subsequent calls will result in no-op and return
-    /// already created row group metadata.
-    fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
-}
+#[deprecated = "use std::io::Write"]
+pub trait ParquetWriter: Write + std::io::Seek + TryClone {}
+#[allow(deprecated)]
+impl<T: Write + std::io::Seek + TryClone> ParquetWriter for T {}
 
 // ----------------------------------------------------------------------
 // Serialized impl for file & row group writers
 
-pub trait ParquetWriter: Write + Seek + TryClone {}
-impl<T: Write + Seek + TryClone> ParquetWriter for T {}
-
-/// A serialized implementation for Parquet [`FileWriter`].
-/// See documentation on file writer for more information.
-pub struct SerializedFileWriter<W: ParquetWriter> {
-    buf: W,
+/// Parquet file writer API.
+/// Provides methods to write row groups sequentially.
+///
+/// The main workflow should be as following:
+/// - Create file writer, this will open a new file and potentially write some metadata.
+/// - Request a new row group writer by calling `next_row_group`.
+/// - Once finished writing row group, close row group writer by calling `close`
+/// - Write subsequent row groups, if necessary.
+/// - After all row groups have been written, close the file writer using `close` method.
+pub struct SerializedFileWriter<W: Write> {
+    buf: TrackedWrite<W>,
     schema: TypePtr,
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
-    total_num_rows: i64,
     row_groups: Vec<RowGroupMetaDataPtr>,
-    previous_writer_closed: bool,
-    is_closed: bool,
+    row_group_index: usize,
 }
 
-impl<W: ParquetWriter> SerializedFileWriter<W> {
+impl<W: Write> SerializedFileWriter<W> {
     /// Creates new file writer.
-    pub fn new(
-        mut buf: W,
-        schema: TypePtr,
-        properties: WriterPropertiesPtr,
-    ) -> Result<Self> {
+    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
+        let mut buf = TrackedWrite::new(buf);
         Self::start_file(&mut buf)?;
         Ok(Self {
             buf,
             schema: schema.clone(),
             descr: Arc::new(SchemaDescriptor::new(schema)),
             props: properties,
-            total_num_rows: 0,
-            row_groups: Vec::new(),
-            previous_writer_closed: true,
-            is_closed: false,
+            row_groups: vec![],
+            row_group_index: 0,
         })
     }
 
-    /// Writes magic bytes at the beginning of the file.
-    fn start_file(buf: &mut W) -> Result<()> {
-        buf.write_all(&PARQUET_MAGIC)?;
-        Ok(())
+    /// Creates new row group from this file writer.
+    /// In case of IO error or Thrift error, returns `Err`.
+    ///
+    /// There is no limit on a number of row groups in a file; however, row groups have
+    /// to be written sequentially. Every time the next row group is requested, the
+    /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
+    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
+        self.assert_previous_writer_closed()?;
+        self.row_group_index += 1;
+
+        let row_groups = &mut self.row_groups;
+        let on_close = |metadata| {
+            row_groups.push(metadata);
+            Ok(())
+        };
+
+        let row_group_writer = SerializedRowGroupWriter::new(
+            self.descr.clone(),
+            self.props.clone(),
+            &mut self.buf,
+            Some(Box::new(on_close)),
+        );
+        Ok(row_group_writer)
+    }
+
+    /// Closes and finalises file writer, returning the file metadata.
+    ///
+    /// All row groups must be appended before this method is called.
+    /// No writes are allowed after this point.
+    ///
+    /// Can be called multiple times. It is up to implementation to either result in
+    /// no-op, or return an `Err` for subsequent calls.
+    pub fn close(mut self) -> Result<parquet::FileMetaData> {
+        self.assert_previous_writer_closed()?;
+        let metadata = self.write_metadata()?;
+        Ok(metadata)
     }
 
-    /// Finalises active row group writer, otherwise no-op.
-    fn finalise_row_group_writer(
-        &mut self,
-        mut row_group_writer: Box<dyn RowGroupWriter>,
-    ) -> Result<()> {
-        let row_group_metadata = row_group_writer.close()?;
-        self.total_num_rows += row_group_metadata.num_rows();
-        self.row_groups.push(row_group_metadata);
+    /// Writes magic bytes at the beginning of the file.
+    fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> {
+        buf.write_all(&PARQUET_MAGIC)?;
         Ok(())
     }
 
     /// Assembles and writes metadata at the end of the file.
     fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
+        let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
+
+        let row_groups = self
+            .row_groups
+            .as_slice()
+            .iter()
+            .map(|v| v.to_thrift())
+            .collect();
+
         let file_metadata = parquet::FileMetaData {
+            num_rows,
+            row_groups,
             version: self.props.writer_version().as_num(),
             schema: types::to_thrift(self.schema.as_ref())?,
-            num_rows: self.total_num_rows as i64,
-            row_groups: self
-                .row_groups
-                .as_slice()
-                .iter()
-                .map(|v| v.to_thrift())
-                .collect(),
             key_value_metadata: self.props.key_value_metadata().cloned(),
             created_by: Some(self.props.created_by().to_owned()),
             column_orders: None,
@@ -196,13 +196,13 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
         };
 
         // Write file metadata
-        let start_pos = self.buf.seek(SeekFrom::Current(0))?;
+        let start_pos = self.buf.bytes_written();
         {
             let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
             file_metadata.write_to_out_protocol(&mut protocol)?;
             protocol.flush()?;
         }
-        let end_pos = self.buf.seek(SeekFrom::Current(0))?;
+        let end_pos = self.buf.bytes_written();
 
         // Write footer
         let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE];
@@ -213,18 +213,9 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
         Ok(file_metadata)
     }
 
-    #[inline]
-    fn assert_closed(&self) -> Result<()> {
-        if self.is_closed {
-            Err(general_err!("File writer is closed"))
-        } else {
-            Ok(())
-        }
-    }
-
     #[inline]
     fn assert_previous_writer_closed(&self) -> Result<()> {
-        if !self.previous_writer_closed {
+        if self.row_group_index != self.row_groups.len() {
             Err(general_err!("Previous row group writer was not closed"))
         } else {
             Ok(())
@@ -232,157 +223,107 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
     }
 }
 
-impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
-    #[inline]
-    fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>> {
-        self.assert_closed()?;
-        self.assert_previous_writer_closed()?;
-        let row_group_writer = SerializedRowGroupWriter::new(
-            self.descr.clone(),
-            self.props.clone(),
-            &self.buf,
-        );
-        self.previous_writer_closed = false;
-        Ok(Box::new(row_group_writer))
-    }
-
-    #[inline]
-    fn close_row_group(
-        &mut self,
-        row_group_writer: Box<dyn RowGroupWriter>,
-    ) -> Result<()> {
-        self.assert_closed()?;
-        let res = self.finalise_row_group_writer(row_group_writer);
-        self.previous_writer_closed = res.is_ok();
-        res
-    }
-
-    #[inline]
-    fn close(&mut self) -> Result<parquet::FileMetaData> {
-        self.assert_closed()?;
-        self.assert_previous_writer_closed()?;
-        let metadata = self.write_metadata()?;
-        self.is_closed = true;
-        Ok(metadata)
-    }
-}
-
-/// A serialized implementation for Parquet [`RowGroupWriter`].
-/// Coordinates writing of a row group with column writers.
-/// See documentation on row group writer for more information.
-pub struct SerializedRowGroupWriter<W: ParquetWriter> {
+/// Parquet row group writer API.
+/// Provides methods to access column writers in an iterator-like fashion, order is
+/// guaranteed to match the order of schema leaves (column descriptors).
+///
+/// All columns should be written sequentially; the main workflow is:
+/// - Request the next column using `next_column` method - this will return `None` if no
+/// more columns are available to write.
+/// - Once done writing a column, close column writer with `close`
+/// - Once all columns have been written, close row group writer with `close` method -
+/// it will return row group metadata and is no-op on already closed row group.
+pub struct SerializedRowGroupWriter<'a, W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
-    buf: W,
+    buf: &'a mut TrackedWrite<W>,
     total_rows_written: Option<u64>,
     total_bytes_written: u64,
     column_index: usize,
-    previous_writer_closed: bool,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    on_close: Option<OnCloseRowGroup<'a>>,
 }
 
-impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
+impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+    /// Creates a new `SerializedRowGroupWriter` with:
+    ///
+    /// - `schema_descr` - the schema to write
+    /// - `properties` - writer properties
+    /// - `buf` - the buffer to write data to
+    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
     pub fn new(
         schema_descr: SchemaDescPtr,
         properties: WriterPropertiesPtr,
-        buf: &W,
+        buf: &'a mut TrackedWrite<W>,
+        on_close: Option<OnCloseRowGroup<'a>>,
     ) -> Self {
         let num_columns = schema_descr.num_columns();
         Self {
+            buf,
+            on_close,
+            total_rows_written: None,
             descr: schema_descr,
             props: properties,
-            buf: buf.try_clone().unwrap(),
-            total_rows_written: None,
-            total_bytes_written: 0,
             column_index: 0,
-            previous_writer_closed: true,
             row_group_metadata: None,
             column_chunks: Vec::with_capacity(num_columns),
+            total_bytes_written: 0,
         }
     }
 
-    /// Checks and finalises current column writer.
-    fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {
-        let (bytes_written, rows_written, metadata) = match writer {
-            ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
-            ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
-            ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
-            ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?,
-        };
-
-        // Update row group writer metrics
-        self.total_bytes_written += bytes_written;
-        self.column_chunks.push(metadata);
-        if let Some(rows) = self.total_rows_written {
-            if rows != rows_written {
-                return Err(general_err!(
-                    "Incorrect number of rows, expected {} != {} rows",
-                    rows,
-                    rows_written
-                ));
-            }
-        } else {
-            self.total_rows_written = Some(rows_written);
-        }
-
-        Ok(())
-    }
-
-    #[inline]
-    fn assert_closed(&self) -> Result<()> {
-        if self.row_group_metadata.is_some() {
-            Err(general_err!("Row group writer is closed"))
-        } else {
-            Ok(())
-        }
-    }
-
-    #[inline]
-    fn assert_previous_writer_closed(&self) -> Result<()> {
-        if !self.previous_writer_closed {
-            Err(general_err!("Previous column writer was not closed"))
-        } else {
-            Ok(())
-        }
-    }
-}
-
-impl<W: 'static + ParquetWriter> RowGroupWriter for SerializedRowGroupWriter<W> {
-    #[inline]
-    fn next_column(&mut self) -> Result<Option<ColumnWriter>> {
-        self.assert_closed()?;
+    /// Returns the next column writer, if available; otherwise returns `None`.
+    /// In case of any IO error or Thrift error, or if row group writer has already been
+    /// closed returns `Err`.
+    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
         self.assert_previous_writer_closed()?;
 
         if self.column_index >= self.descr.num_columns() {
             return Ok(None);
         }
-        let sink = FileSink::new(&self.buf);
-        let page_writer = Box::new(SerializedPageWriter::new(sink));
+        let page_writer = Box::new(SerializedPageWriter::new(self.buf));
         let column_writer = get_column_writer(
             self.descr.column(self.column_index),
             self.props.clone(),
             page_writer,
         );
         self.column_index += 1;
-        self.previous_writer_closed = false;
 
-        Ok(Some(column_writer))
-    }
+        let total_bytes_written = &mut self.total_bytes_written;
+        let total_rows_written = &mut self.total_rows_written;
+        let column_chunks = &mut self.column_chunks;
+
+        let on_close = |bytes_written, rows_written, metadata| {
+            // Update row group writer metrics
+            *total_bytes_written += bytes_written;
+            column_chunks.push(metadata);
+            if let Some(rows) = *total_rows_written {
+                if rows != rows_written {
+                    return Err(general_err!(
+                        "Incorrect number of rows, expected {} != {} rows",
+                        rows,
+                        rows_written
+                    ));
+                }
+            } else {
+                *total_rows_written = Some(rows_written);
+            }
 
-    #[inline]
-    fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()> {
-        let res = self.finalise_column_writer(column_writer);
-        self.previous_writer_closed = res.is_ok();
-        res
+            Ok(())
+        };
+
+        Ok(Some(SerializedColumnWriter::new(
+            column_writer,
+            Some(Box::new(on_close)),
+        )))
     }
 
-    #[inline]
-    fn close(&mut self) -> Result<RowGroupMetaDataPtr> {
+    /// Closes this row group writer and returns row group metadata.
+    /// After calling this method row group writer must not be used.
+    ///
+    /// Can be called multiple times. In subsequent calls will result in no-op and return
+    /// already created row group metadata.
+    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
         if self.row_group_metadata.is_none() {
             self.assert_previous_writer_closed()?;
 
@@ -393,25 +334,86 @@ impl<W: 'static + ParquetWriter> RowGroupWriter for SerializedRowGroupWriter<W>
                 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
                 .build()?;
 
-            self.row_group_metadata = Some(Arc::new(row_group_metadata));
+            let metadata = Arc::new(row_group_metadata);
+            self.row_group_metadata = Some(metadata.clone());
+
+            if let Some(on_close) = self.on_close.take() {
+                on_close(metadata)?
+            }
         }
 
         let metadata = self.row_group_metadata.as_ref().unwrap().clone();
         Ok(metadata)
     }
+
+    #[inline]
+    fn assert_previous_writer_closed(&self) -> Result<()> {
+        if self.column_index != self.column_chunks.len() {
+            Err(general_err!("Previous column writer was not closed"))
+        } else {
+            Ok(())
+        }
+    }
+}
+
+/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
+pub struct SerializedColumnWriter<'a> {
+    inner: ColumnWriter<'a>,
+    on_close: Option<OnCloseColumnChunk<'a>>,
+}
+
+impl<'a> SerializedColumnWriter<'a> {
+    /// Create a new [`SerializedColumnWriter`] from a `[`ColumnWriter`] and an
+    /// optional callback to be invoked on [`Self::close`]
+    pub fn new(
+        inner: ColumnWriter<'a>,
+        on_close: Option<OnCloseColumnChunk<'a>>,
+    ) -> Self {
+        Self { inner, on_close }
+    }
+
+    /// Returns a reference to an untyped [`ColumnWriter`]
+    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
+        &mut self.inner
+    }
+
+    /// Returns a reference to a typed [`ColumnWriterImpl`]
+    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
+        get_typed_column_writer_mut(&mut self.inner)
+    }
+
+    /// Close this [`SerializedColumnWriter]
+    pub fn close(mut self) -> Result<()> {
+        let (bytes_written, rows_written, metadata) = match self.inner {
+            ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
+            ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
+            ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
+            ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
+            ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
+            ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
+            ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
+            ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?,
+        };
+
+        if let Some(on_close) = self.on_close.take() {
+            on_close(bytes_written, rows_written, metadata)?
+        }
+
+        Ok(())
+    }
 }
 
 /// A serialized implementation for Parquet [`PageWriter`].
 /// Writes and serializes pages and metadata into output stream.
 ///
 /// `SerializedPageWriter` should not be used after calling `close()`.
-pub struct SerializedPageWriter<T: Write + Position> {
-    sink: T,
+pub struct SerializedPageWriter<'a, W> {
+    sink: &'a mut TrackedWrite<W>,
 }
 
-impl<T: Write + Position> SerializedPageWriter<T> {
+impl<'a, W: Write> SerializedPageWriter<'a, W> {
     /// Creates new page writer.
-    pub fn new(sink: T) -> Self {
+    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
         Self { sink }
     }
 
@@ -419,13 +421,13 @@ impl<T: Write + Position> SerializedPageWriter<T> {
     /// Returns number of bytes that have been written into the sink.
     #[inline]
     fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
-        let start_pos = self.sink.pos();
+        let start_pos = self.sink.bytes_written();
         {
             let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
             header.write_to_out_protocol(&mut protocol)?;
             protocol.flush()?;
         }
-        Ok((self.sink.pos() - start_pos) as usize)
+        Ok(self.sink.bytes_written() - start_pos)
     }
 
     /// Serializes column chunk into Thrift.
@@ -439,7 +441,7 @@ impl<T: Write + Position> SerializedPageWriter<T> {
     }
 }
 
-impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
+impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
     fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
         let uncompressed_size = page.uncompressed_size();
         let compressed_size = page.compressed_size();
@@ -506,7 +508,7 @@ impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
             }
         }
 
-        let start_pos = self.sink.pos();
+        let start_pos = self.sink.bytes_written() as u64;
 
         let header_size = self.serialize_page_header(page_header)?;
         self.sink.write_all(page.data())?;
@@ -516,7 +518,7 @@ impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
         spec.uncompressed_size = uncompressed_size + header_size;
         spec.compressed_size = compressed_size + header_size;
         spec.offset = start_pos;
-        spec.bytes_written = self.sink.pos() - start_pos;
+        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
         // Number of values is incremented for data pages only
         if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 {
             spec.num_values = num_values;
@@ -544,6 +546,7 @@ mod tests {
     use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
     use crate::column::page::PageReader;
     use crate::compression::{create_codec, Codec};
+    use crate::data_type::Int32Type;
     use crate::file::{
         properties::{WriterProperties, WriterVersion},
         reader::{FileReader, SerializedFileReader, SerializedPageReader},
@@ -552,48 +555,6 @@ mod tests {
     use crate::record::RowAccessor;
     use crate::util::memory::ByteBufferPtr;
 
-    #[test]
-    fn test_file_writer_error_after_close() {
-        let file = tempfile::tempfile().unwrap();
-        let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap());
-        let props = Arc::new(WriterProperties::builder().build());
-        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-        writer.close().unwrap();
-        {
-            let res = writer.next_row_group();
-            assert!(res.is_err());
-            if let Err(err) = res {
-                assert_eq!(format!("{}", err), "Parquet error: File writer is closed");
-            }
-        }
-        {
-            let res = writer.close();
-            assert!(res.is_err());
-            if let Err(err) = res {
-                assert_eq!(format!("{}", err), "Parquet error: File writer is closed");
-            }
-        }
-    }
-
-    #[test]
-    fn test_row_group_writer_error_after_close() {
-        let file = tempfile::tempfile().unwrap();
-        let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap());
-        let props = Arc::new(WriterProperties::builder().build());
-        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-        let mut row_group_writer = writer.next_row_group().unwrap();
-        row_group_writer.close().unwrap();
-
-        let res = row_group_writer.next_column();
-        assert!(res.is_err());
-        if let Err(err) = res {
-            assert_eq!(
-                format!("{}", err),
-                "Parquet error: Row group writer is closed"
-            );
-        }
-    }
-
     #[test]
     fn test_row_group_writer_error_not_all_columns_written() {
         let file = tempfile::tempfile().unwrap();
@@ -609,7 +570,7 @@ mod tests {
         );
         let props = Arc::new(WriterProperties::builder().build());
         let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-        let mut row_group_writer = writer.next_row_group().unwrap();
+        let row_group_writer = writer.next_row_group().unwrap();
         let res = row_group_writer.close();
         assert!(res.is_err());
         if let Err(err) = res {
@@ -647,24 +608,23 @@ mod tests {
         let mut row_group_writer = writer.next_row_group().unwrap();
 
         let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
-        if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
-            typed.write_batch(&[1, 2, 3], None, None).unwrap();
-        }
-        row_group_writer.close_column(col_writer).unwrap();
+        col_writer
+            .typed::<Int32Type>()
+            .write_batch(&[1, 2, 3], None, None)
+            .unwrap();
+        col_writer.close().unwrap();
 
         let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
-        if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
-            typed.write_batch(&[1, 2], None, None).unwrap();
-        }
+        col_writer
+            .typed::<Int32Type>()
+            .write_batch(&[1, 2], None, None)
+            .unwrap();
 
-        let res = row_group_writer.close_column(col_writer);
-        assert!(res.is_err());
-        if let Err(err) = res {
-            assert_eq!(
-                format!("{}", err),
-                "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
-            );
-        }
+        let err = col_writer.close().unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
+        );
     }
 
     #[test]
@@ -682,7 +642,7 @@ mod tests {
                 .unwrap(),
         );
         let props = Arc::new(WriterProperties::builder().build());
-        let mut writer =
+        let writer =
             SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
         writer.close().unwrap();
 
@@ -712,7 +672,7 @@ mod tests {
                 )]))
                 .build(),
         );
-        let mut writer =
+        let writer =
             SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
         writer.close().unwrap();
 
@@ -758,7 +718,7 @@ mod tests {
                 .set_writer_version(WriterVersion::PARQUET_2_0)
                 .build(),
         );
-        let mut writer =
+        let writer =
             SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
         writer.close().unwrap();
 
@@ -971,8 +931,8 @@ mod tests {
         let mut buffer: Vec<u8> = vec![];
         let mut result_pages: Vec<Page> = vec![];
         {
-            let cursor = Cursor::new(&mut buffer);
-            let mut page_writer = SerializedPageWriter::new(cursor);
+            let mut writer = TrackedWrite::new(&mut buffer);
+            let mut page_writer = SerializedPageWriter::new(&mut writer);
 
             for page in compressed_pages {
                 page_writer.write_page(page).unwrap();
@@ -1041,22 +1001,15 @@ mod tests {
 
         for subset in &data {
             let mut row_group_writer = file_writer.next_row_group().unwrap();
-            let col_writer = row_group_writer.next_column().unwrap();
-            if let Some(mut writer) = col_writer {
-                match writer {
-                    ColumnWriter::Int32ColumnWriter(ref mut typed) => {
-                        rows +=
-                            typed.write_batch(&subset[..], None, None).unwrap() as i64;
-                    }
-                    _ => {
-                        unimplemented!();
-                    }
-                }
-                row_group_writer.close_column(writer).unwrap();
+            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
+                rows += writer
+                    .typed::<Int32Type>()
+                    .write_batch(&subset[..], None, None)
+                    .unwrap() as i64;
+                writer.close().unwrap();
             }
-            file_writer.close_row_group(row_group_writer).unwrap();
+            row_group_writer.close().unwrap();
         }
-
         file_writer.close().unwrap();
 
         let reader = assert_send(SerializedFileReader::new(file).unwrap());
@@ -1101,7 +1054,7 @@ mod tests {
     }
 
     fn test_bytes_roundtrip(data: Vec<Vec<i32>>) {
-        let cursor = InMemoryWriteableCursor::default();
+        let mut cursor = Cursor::new(vec![]);
 
         let schema = Arc::new(
             types::Type::group_type_builder("schema")
@@ -1119,30 +1072,24 @@ mod tests {
         {
             let props = Arc::new(WriterProperties::builder().build());
             let mut writer =
-                SerializedFileWriter::new(cursor.clone(), schema, props).unwrap();
+                SerializedFileWriter::new(&mut cursor, schema, props).unwrap();
 
             for subset in &data {
                 let mut row_group_writer = writer.next_row_group().unwrap();
-                let col_writer = row_group_writer.next_column().unwrap();
-                if let Some(mut writer) = col_writer {
-                    match writer {
-                        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
-                            rows += typed.write_batch(&subset[..], None, None).unwrap()
-                                as i64;
-                        }
-                        _ => {
-                            unimplemented!();
-                        }
-                    }
-                    row_group_writer.close_column(writer).unwrap();
+                if let Some(mut writer) = row_group_writer.next_column().unwrap() {
+                    rows += writer
+                        .typed::<Int32Type>()
+                        .write_batch(&subset[..], None, None)
+                        .unwrap() as i64;
+
+                    writer.close().unwrap();
                 }
-                writer.close_row_group(row_group_writer).unwrap();
+                row_group_writer.close().unwrap();
             }
-
             writer.close().unwrap();
         }
 
-        let buffer = cursor.into_inner().unwrap();
+        let buffer = cursor.into_inner();
 
         let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
         let reader = SerializedFileReader::new(reading_cursor).unwrap();
diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs
index 6668eec51..fe803a7ff 100644
--- a/parquet/src/record/record_writer.rs
+++ b/parquet/src/record/record_writer.rs
@@ -18,12 +18,12 @@
 use crate::schema::types::TypePtr;
 
 use super::super::errors::ParquetError;
-use super::super::file::writer::RowGroupWriter;
+use super::super::file::writer::SerializedRowGroupWriter;
 
 pub trait RecordWriter<T> {
-    fn write_to_row_group(
+    fn write_to_row_group<W: std::io::Write>(
         &self,
-        row_group_writer: &mut Box<dyn RowGroupWriter>,
+        row_group_writer: &mut SerializedRowGroupWriter<W>,
     ) -> Result<(), ParquetError>;
 
     /// Generated schema
diff --git a/parquet/src/util/cursor.rs b/parquet/src/util/cursor.rs
index c03fc66f2..ff7067fcb 100644
--- a/parquet/src/util/cursor.rs
+++ b/parquet/src/util/cursor.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::util::io::TryClone;
 use std::io::{self, Cursor, Error, ErrorKind, Read, Seek, SeekFrom, Write};
 use std::sync::{Arc, Mutex};
 use std::{cmp, fmt};
 
-use crate::file::writer::TryClone;
-
 /// This is object to use if your file is already in memory.
 /// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices".
 /// To achieve this, it uses Arc instead of shared references. Indeed reference fields are painful
@@ -134,11 +133,13 @@ impl Seek for SliceableCursor {
 }
 
 /// Use this type to write Parquet to memory rather than a file.
+#[deprecated = "use Vec<u8> instead"]
 #[derive(Debug, Default, Clone)]
 pub struct InMemoryWriteableCursor {
     buffer: Arc<Mutex<Cursor<Vec<u8>>>>,
 }
 
+#[allow(deprecated)]
 impl InMemoryWriteableCursor {
     /// Consume this instance and return the underlying buffer as long as there are no other
     /// references to this instance.
@@ -168,6 +169,7 @@ impl InMemoryWriteableCursor {
     }
 }
 
+#[allow(deprecated)]
 impl TryClone for InMemoryWriteableCursor {
     fn try_clone(&self) -> std::io::Result<Self> {
         Ok(Self {
@@ -176,6 +178,7 @@ impl TryClone for InMemoryWriteableCursor {
     }
 }
 
+#[allow(deprecated)]
 impl Write for InMemoryWriteableCursor {
     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
         let mut inner = self.buffer.lock().unwrap();
@@ -188,6 +191,7 @@ impl Write for InMemoryWriteableCursor {
     }
 }
 
+#[allow(deprecated)]
 impl Seek for InMemoryWriteableCursor {
     fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
         let mut inner = self.buffer.lock().unwrap();
diff --git a/parquet/src/util/io.rs b/parquet/src/util/io.rs
index c10e6d616..a7b5e7307 100644
--- a/parquet/src/util/io.rs
+++ b/parquet/src/util/io.rs
@@ -17,7 +17,9 @@
 
 use std::{cell::RefCell, cmp, fmt, io::*};
 
-use crate::file::{reader::Length, writer::ParquetWriter};
+use crate::file::reader::Length;
+#[allow(deprecated)]
+use crate::file::writer::ParquetWriter;
 
 const DEFAULT_BUF_SIZE: usize = 8 * 1024;
 
@@ -156,6 +158,8 @@ impl<R: ParquetReader> Length for FileSource<R> {
 
 /// Struct that represents `File` output stream with position tracking.
 /// Used as a sink in file writer.
+#[deprecated = "use TrackedWrite instead"]
+#[allow(deprecated)]
 pub struct FileSink<W: ParquetWriter> {
     buf: BufWriter<W>,
     // This is not necessarily position in the underlying file,
@@ -163,6 +167,7 @@ pub struct FileSink<W: ParquetWriter> {
     pos: u64,
 }
 
+#[allow(deprecated)]
 impl<W: ParquetWriter> FileSink<W> {
     /// Creates new file sink.
     /// Position is set to whatever position file has.
@@ -176,6 +181,7 @@ impl<W: ParquetWriter> FileSink<W> {
     }
 }
 
+#[allow(deprecated)]
 impl<W: ParquetWriter> Write for FileSink<W> {
     fn write(&mut self, buf: &[u8]) -> Result<usize> {
         let num_bytes = self.buf.write(buf)?;
@@ -188,6 +194,7 @@ impl<W: ParquetWriter> Write for FileSink<W> {
     }
 }
 
+#[allow(deprecated)]
 impl<W: ParquetWriter> Position for FileSink<W> {
     fn pos(&self) -> u64 {
         self.pos
@@ -271,6 +278,7 @@ mod tests {
     }
 
     #[test]
+    #[allow(deprecated)]
     fn test_io_write_with_pos() {
         let mut file = tempfile::tempfile().unwrap();
         file.write_all(&[b'a', b'b', b'c']).unwrap();
diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs
index b9d757e71..dc2eccfbf 100644
--- a/parquet/tests/boolean_writer.rs
+++ b/parquet/tests/boolean_writer.rs
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use parquet::column::writer::ColumnWriter;
+use parquet::data_type::BoolType;
 use parquet::file::properties::WriterProperties;
 use parquet::file::reader::FileReader;
 use parquet::file::serialized_reader::SerializedFileReader;
-use parquet::file::writer::FileWriter;
 use parquet::file::writer::SerializedFileWriter;
 use parquet::schema::parser::parse_message_type;
 use std::fs;
@@ -53,25 +52,15 @@ fn it_writes_data_without_hanging() {
         while let Some(mut col_writer) =
             row_group_writer.next_column().expect("next column")
         {
-            match col_writer {
-                ColumnWriter::BoolColumnWriter(ref mut typed_writer) => {
-                    typed_writer
-                        .write_batch(&my_bool_values, None, None)
-                        .expect("writing bool column");
-                }
-                _ => {
-                    panic!("only test boolean values");
-                }
-            }
-            row_group_writer
-                .close_column(col_writer)
-                .expect("close column");
+            col_writer
+                .typed::<BoolType>()
+                .write_batch(&my_bool_values, None, None)
+                .expect("writing bool column");
+
+            col_writer.close().expect("close column");
         }
         let rg_md = row_group_writer.close().expect("close row group");
         println!("total rows written: {}", rg_md.num_rows());
-        writer
-            .close_row_group(row_group_writer)
-            .expect("close row groups");
     }
     writer.close().expect("close writer");
 
diff --git a/parquet/tests/custom_writer.rs b/parquet/tests/custom_writer.rs
deleted file mode 100644
index 0a57e79d9..000000000
--- a/parquet/tests/custom_writer.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::fs::File;
-use std::{
-    fs,
-    io::{prelude::*, SeekFrom},
-    sync::Arc,
-};
-
-use parquet::file::writer::TryClone;
-use parquet::{
-    basic::Repetition, basic::Type, file::properties::WriterProperties,
-    file::writer::SerializedFileWriter, schema::types,
-};
-use std::env;
-
-// Test creating some sort of custom writer to ensure the
-// appropriate traits are exposed
-struct CustomWriter {
-    file: File,
-}
-
-impl Write for CustomWriter {
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        self.file.write(buf)
-    }
-    fn flush(&mut self) -> std::io::Result<()> {
-        self.file.flush()
-    }
-}
-
-impl Seek for CustomWriter {
-    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
-        self.file.seek(pos)
-    }
-}
-
-impl TryClone for CustomWriter {
-    fn try_clone(&self) -> std::io::Result<Self> {
-        use std::io::{Error, ErrorKind};
-        Err(Error::new(ErrorKind::Other, "Clone not supported"))
-    }
-}
-
-#[test]
-fn test_custom_writer() {
-    let schema = Arc::new(
-        types::Type::group_type_builder("schema")
-            .with_fields(&mut vec![Arc::new(
-                types::Type::primitive_type_builder("col1", Type::INT32)
-                    .with_repetition(Repetition::REQUIRED)
-                    .build()
-                    .unwrap(),
-            )])
-            .build()
-            .unwrap(),
-    );
-    let props = Arc::new(WriterProperties::builder().build());
-
-    let file = get_temp_file("test_custom_file_writer");
-    let test_file = file.try_clone().unwrap();
-
-    let writer = CustomWriter { file };
-
-    // test is that this file can be created
-    let file_writer = SerializedFileWriter::new(writer, schema, props).unwrap();
-    std::mem::drop(file_writer);
-
-    // ensure the file now exists and has non zero size
-    let metadata = test_file.metadata().unwrap();
-    assert!(metadata.len() > 0);
-}
-
-/// Returns file handle for a temp file in 'target' directory with a provided content
-fn get_temp_file(file_name: &str) -> fs::File {
-    // build tmp path to a file in "target/debug/testdata"
-    let mut path_buf = env::current_dir().unwrap();
-    path_buf.push("target");
-    path_buf.push("debug");
-    path_buf.push("testdata");
-    fs::create_dir_all(&path_buf).unwrap();
-    path_buf.push(file_name);
-
-    File::create(path_buf).unwrap()
-}
diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs
index d5d1e6818..fc7af20ca 100644
--- a/parquet_derive/src/lib.rs
+++ b/parquet_derive/src/lib.rs
@@ -98,9 +98,9 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
 
     (quote! {
     impl #generics RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
-      fn write_to_row_group(
+      fn write_to_row_group<W: std::io::Write>(
         &self,
-        row_group_writer: &mut Box<dyn parquet::file::writer::RowGroupWriter>
+        row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter<'_, W>
       ) -> Result<(), parquet::errors::ParquetError> {
         let mut row_group_writer = row_group_writer;
         let records = &self; // Used by all the writer snippets to be more clear
@@ -110,7 +110,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
               let mut some_column_writer = row_group_writer.next_column().unwrap();
               if let Some(mut column_writer) = some_column_writer {
                   #writer_snippets
-                  row_group_writer.close_column(column_writer)?;
+                  column_writer.close()?;
               } else {
                   return Err(parquet::errors::ParquetError::General("Failed to get next column".into()))
               }
diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs
index be2e6efaa..835ac793e 100644
--- a/parquet_derive/src/parquet_field.rs
+++ b/parquet_derive/src/parquet_field.rs
@@ -147,7 +147,7 @@ impl Field {
         // this expression just switches between non-nullable and nullable write statements
         let write_batch_expr = if definition_levels.is_some() {
             quote! {
-                if let #column_writer(ref mut typed) = column_writer {
+                if let #column_writer(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
                 } else {
                     panic!("Schema and struct disagree on type for {}", stringify!{#ident})
@@ -155,7 +155,7 @@ impl Field {
             }
         } else {
             quote! {
-                if let #column_writer(ref mut typed) = column_writer {
+                if let #column_writer(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], None, None)?;
                 } else {
                     panic!("Schema and struct disagree on type for {}", stringify!{#ident})
@@ -666,7 +666,7 @@ mod test {
                         {
                             let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
 
-                            if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer {
+                            if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() {
                                 typed . write_batch ( & vals [ .. ] , None , None ) ?;
                             }  else {
                                 panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
@@ -703,7 +703,7 @@ mod test {
                     }
                 }).collect();
 
-                if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer {
+                if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
                     typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
@@ -727,7 +727,7 @@ mod test {
                             }
                         }).collect();
 
-                        if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer {
+                        if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
                             typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
                         } else {
                             panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
@@ -750,7 +750,7 @@ mod test {
                             }
                         }).collect();
 
-                        if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer {
+                        if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() {
                             typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
                         }  else {
                             panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
@@ -975,7 +975,7 @@ mod test {
         assert_eq!(when.writer_snippet().to_string(),(quote!{
             {
                 let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
-                if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
+                if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], None, None) ?;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
@@ -995,7 +995,7 @@ mod test {
                     }
                 }).collect();
 
-                if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
+                if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
@@ -1018,7 +1018,7 @@ mod test {
         assert_eq!(when.writer_snippet().to_string(),(quote!{
             {
                 let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
-                if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
+                if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], None, None) ?;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
@@ -1038,7 +1038,7 @@ mod test {
                     }
                 }).collect();
 
-                if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
+                if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
@@ -1061,7 +1061,7 @@ mod test {
         assert_eq!(when.writer_snippet().to_string(),(quote!{
             {
                 let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect();
-                if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
+                if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], None, None) ?;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
@@ -1081,7 +1081,7 @@ mod test {
                     }
                 }).collect();
 
-                if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
+                if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
                     typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
                 } else {
                     panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs
index 2b7c060bb..189802b9a 100644
--- a/parquet_derive_test/src/lib.rs
+++ b/parquet_derive_test/src/lib.rs
@@ -54,10 +54,7 @@ mod tests {
     use super::*;
 
     use parquet::{
-        file::{
-            properties::WriterProperties,
-            writer::{FileWriter, SerializedFileWriter},
-        },
+        file::{properties::WriterProperties, writer::SerializedFileWriter},
         schema::parser::parse_message_type,
     };
     use std::{env, fs, io::Write, sync::Arc};
@@ -133,7 +130,7 @@ mod tests {
 
         let mut row_group = writer.next_row_group().unwrap();
         drs.as_slice().write_to_row_group(&mut row_group).unwrap();
-        writer.close_row_group(row_group).unwrap();
+        row_group.close().unwrap();
         writer.close().unwrap();
     }