You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/25 18:51:57 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #4280: Improve `ArrowWriter` memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871)

alamb commented on code in PR #4280:
URL: https://github.com/apache/arrow-rs/pull/4280#discussion_r1205870610


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {
+        match &self.in_progress {
+            Some(in_progress) => in_progress
+                .writers
+                .iter()
+                .map(|(x, _)| x.lock().unwrap().length)
+                .sum(),
+            None => 0,
+        }
+    }
+
+    /// Encodes the provided [`RecordBatch`]
     ///
-    /// If following this there are more than `max_row_group_size` rows buffered,
-    /// this will flush out one or more row groups with `max_row_group_size` rows,
-    /// and drop any fully written `RecordBatch`
+    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
+    /// rows, the contents of `batch` will be distributed across multiple row groups such that all
+    /// but the final row group in the file contain [`WriterProperties::max_row_group_size`] rows

Review Comment:
   ```suggestion
       /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
       /// rows, the contents of `batch` will be written to one or more row groups such that all
       /// but the final row group in the file contain [`WriterProperties::max_row_group_size`] rows
   ```
   
   (minor suggestion that I think is clearer -- feel free to ignore)



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -284,156 +247,271 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
     }
 }
 
-fn write_leaves<W: Write>(
-    row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
-    arrays: &[ArrayRef],
-    levels: &mut [Vec<LevelInfo>],
-) -> Result<()> {
-    assert_eq!(arrays.len(), levels.len());
-    assert!(!arrays.is_empty());
-
-    let data_type = arrays.first().unwrap().data_type().clone();
-    assert!(arrays.iter().all(|a| a.data_type() == &data_type));
-
-    match &data_type {
-        ArrowDataType::Null
-        | ArrowDataType::Boolean
-        | ArrowDataType::Int8
-        | ArrowDataType::Int16
-        | ArrowDataType::Int32
-        | ArrowDataType::Int64
-        | ArrowDataType::UInt8
-        | ArrowDataType::UInt16
-        | ArrowDataType::UInt32
-        | ArrowDataType::UInt64
-        | ArrowDataType::Float32
-        | ArrowDataType::Float64
-        | ArrowDataType::Timestamp(_, _)
-        | ArrowDataType::Date32
-        | ArrowDataType::Date64
-        | ArrowDataType::Time32(_)
-        | ArrowDataType::Time64(_)
-        | ArrowDataType::Duration(_)
-        | ArrowDataType::Interval(_)
-        | ArrowDataType::Decimal128(_, _)
-        | ArrowDataType::Decimal256(_, _)
-        | ArrowDataType::FixedSizeBinary(_) => {
-            let mut col_writer = row_group_writer.next_column()?.unwrap();
-            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
+/// A list of [`Bytes`] comprising a single column chunk
+#[derive(Default)]
+struct ArrowColumnChunk {
+    length: usize,
+    data: Vec<Bytes>,
+}
+
+impl Length for ArrowColumnChunk {
+    fn len(&self) -> u64 {
+        self.length as _
+    }
+}
+
+impl ChunkReader for ArrowColumnChunk {
+    type T = ChainReader;
+
+    fn get_read(&self, start: u64) -> Result<Self::T> {
+        assert_eq!(start, 0);
+        Ok(ChainReader(self.data.clone().into_iter().peekable()))
+    }
+
+    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
+        unimplemented!()
+    }
+}
+
+/// A [`Read`] for an iterator of [`Bytes`]
+struct ChainReader(Peekable<IntoIter<Bytes>>);
+
+impl Read for ChainReader {
+    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
+        let buffer = loop {
+            match self.0.peek_mut() {
+                Some(b) if b.is_empty() => {
+                    self.0.next();
+                    continue;
+                }
+                Some(b) => break b,
+                None => return Ok(0),
             }
-            col_writer.close()
+        };
+
+        let len = buffer.len().min(out.len());
+        let b = buffer.split_to(len);
+        out[..len].copy_from_slice(&b);
+        Ok(len)
+    }
+}
+
+/// A shared [`ArrowColumnChunk`]
+///
+/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
+/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows

Review Comment:
   I looked at this a bit -- is the issue that you can't get the `ColumnChunk` back from the `GenericColumnWriter`?
   
   I wonder if we could add something like `GenericColumnWriter::close_into_inner()` that returned the inner writer and the ColumnCloseResult ๐Ÿค”  
   
   But that would still require downcasting back so ๐Ÿคท  not sure it is any better 



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {
+        match &self.in_progress {
+            Some(in_progress) => in_progress
+                .writers
+                .iter()
+                .map(|(x, _)| x.lock().unwrap().length)
+                .sum(),
+            None => 0,
+        }
+    }
+
+    /// Encodes the provided [`RecordBatch`]
     ///
-    /// If following this there are more than `max_row_group_size` rows buffered,
-    /// this will flush out one or more row groups with `max_row_group_size` rows,
-    /// and drop any fully written `RecordBatch`
+    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
+    /// rows, the contents of `batch` will be distributed across multiple row groups such that all
+    /// but the final row group in the file contain [`WriterProperties::max_row_group_size`] rows
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
-        // validate batch schema against writer's supplied schema
-        let batch_schema = batch.schema();
-        if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
-            || self.arrow_schema.contains(&batch_schema))
-        {
-            return Err(ParquetError::ArrowError(
-                "Record batch schema does not match writer schema".to_string(),
-            ));
+        if batch.num_rows() == 0 {
+            return Ok(());
         }
 
-        for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
-            buffer.push_back(column.clone())
+        // If would exceed max_row_group_size, split batch
+        if self.buffered_rows + batch.num_rows() > self.max_row_group_size {
+            let to_write = self.max_row_group_size - self.buffered_rows;
+            let a = batch.slice(0, to_write);
+            let b = batch.slice(to_write, batch.num_rows() - to_write);
+            self.write(&a)?;
+            return self.write(&b);

Review Comment:
   I agree -- perhaps we can fix it if this turns out to be a problem in practice.
   
   There is also a workaround for anyone who hits this, which is to break up their input RecordBatch (using `slice()`) into smaller parts



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {

Review Comment:
   I recommend we call this `in_progress_memory_size()` to be more consistent with `RecordBatch::get_array_memory_size()`
   
   Also, could you please add an accessor for `buffered_rows`? perhaps
   
   ```rust
       pub fn in_progress_rows(&self) -> usize {
   ```
   
   (I can add this as a follow on PR too if you prefer)



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -284,156 +247,271 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
     }
 }
 
-fn write_leaves<W: Write>(
-    row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
-    arrays: &[ArrayRef],
-    levels: &mut [Vec<LevelInfo>],
-) -> Result<()> {
-    assert_eq!(arrays.len(), levels.len());
-    assert!(!arrays.is_empty());
-
-    let data_type = arrays.first().unwrap().data_type().clone();
-    assert!(arrays.iter().all(|a| a.data_type() == &data_type));
-
-    match &data_type {
-        ArrowDataType::Null
-        | ArrowDataType::Boolean
-        | ArrowDataType::Int8
-        | ArrowDataType::Int16
-        | ArrowDataType::Int32
-        | ArrowDataType::Int64
-        | ArrowDataType::UInt8
-        | ArrowDataType::UInt16
-        | ArrowDataType::UInt32
-        | ArrowDataType::UInt64
-        | ArrowDataType::Float32
-        | ArrowDataType::Float64
-        | ArrowDataType::Timestamp(_, _)
-        | ArrowDataType::Date32
-        | ArrowDataType::Date64
-        | ArrowDataType::Time32(_)
-        | ArrowDataType::Time64(_)
-        | ArrowDataType::Duration(_)
-        | ArrowDataType::Interval(_)
-        | ArrowDataType::Decimal128(_, _)
-        | ArrowDataType::Decimal256(_, _)
-        | ArrowDataType::FixedSizeBinary(_) => {
-            let mut col_writer = row_group_writer.next_column()?.unwrap();
-            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
+/// A list of [`Bytes`] comprising a single column chunk
+#[derive(Default)]
+struct ArrowColumnChunk {
+    length: usize,
+    data: Vec<Bytes>,
+}
+
+impl Length for ArrowColumnChunk {
+    fn len(&self) -> u64 {
+        self.length as _
+    }
+}
+
+impl ChunkReader for ArrowColumnChunk {
+    type T = ChainReader;
+
+    fn get_read(&self, start: u64) -> Result<Self::T> {
+        assert_eq!(start, 0);
+        Ok(ChainReader(self.data.clone().into_iter().peekable()))
+    }
+
+    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
+        unimplemented!()
+    }
+}
+
+/// A [`Read`] for an iterator of [`Bytes`]
+struct ChainReader(Peekable<IntoIter<Bytes>>);
+
+impl Read for ChainReader {
+    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
+        let buffer = loop {
+            match self.0.peek_mut() {
+                Some(b) if b.is_empty() => {
+                    self.0.next();
+                    continue;
+                }
+                Some(b) => break b,
+                None => return Ok(0),
             }
-            col_writer.close()
+        };
+
+        let len = buffer.len().min(out.len());
+        let b = buffer.split_to(len);
+        out[..len].copy_from_slice(&b);
+        Ok(len)
+    }
+}
+
+/// A shared [`ArrowColumnChunk`]
+///
+/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
+/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
+type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
+
+#[derive(Default)]
+struct ArrowPageWriter {
+    buffer: SharedColumnChunk,
+}
+
+impl PageWriter for ArrowPageWriter {
+    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
+        let mut buf = self.buffer.try_lock().unwrap();
+        let page_header = page.to_thrift_header();
+        let header = {
+            let mut header = Vec::with_capacity(1024);
+            let mut protocol = TCompactOutputProtocol::new(&mut header);
+            page_header.write_to_out_protocol(&mut protocol)?;
+            Bytes::from(header)
+        };
+
+        let data = page.compressed_page().buffer().clone();
+        let compressed_size = data.len() + header.len();
+
+        let mut spec = PageWriteSpec::new();
+        spec.page_type = page.page_type();
+        spec.num_values = page.num_values();
+        spec.uncompressed_size = page.uncompressed_size() + header.len();
+        spec.offset = buf.length as u64;
+        spec.compressed_size = compressed_size;
+        spec.bytes_written = compressed_size as u64;
+
+        buf.length += compressed_size;
+        buf.data.push(header);
+        buf.data.push(data.into());
+
+        Ok(spec)
+    }
+
+    fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> {
+        // Skip writing metadata as won't be copied anyway
+        Ok(())
+    }
+
+    fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
+
+/// Encodes a leaf column to [`ArrowPageWriter`]
+enum ArrowColumnWriter {
+    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
+    Column(ColumnWriter<'static>),
+}
+
+/// Encodes [`RecordBatch`] to a parquet row group
+struct ArrowRowGroupWriter {
+    writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+    schema: SchemaRef,
+}
+
+impl ArrowRowGroupWriter {
+    fn new(

Review Comment:
   Given this returns `Result` I think `try_new(...)` would be a more idomatic name



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {
+        match &self.in_progress {
+            Some(in_progress) => in_progress
+                .writers
+                .iter()
+                .map(|(x, _)| x.lock().unwrap().length)
+                .sum(),
+            None => 0,
+        }
+    }
+
+    /// Encodes the provided [`RecordBatch`]
     ///
-    /// If following this there are more than `max_row_group_size` rows buffered,
-    /// this will flush out one or more row groups with `max_row_group_size` rows,
-    /// and drop any fully written `RecordBatch`
+    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
+    /// rows, the contents of `batch` will be distributed across multiple row groups such that all
+    /// but the final row group in the file contain [`WriterProperties::max_row_group_size`] rows
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
-        // validate batch schema against writer's supplied schema
-        let batch_schema = batch.schema();
-        if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
-            || self.arrow_schema.contains(&batch_schema))
-        {
-            return Err(ParquetError::ArrowError(
-                "Record batch schema does not match writer schema".to_string(),
-            ));
+        if batch.num_rows() == 0 {
+            return Ok(());
         }
 
-        for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
-            buffer.push_back(column.clone())
+        // If would exceed max_row_group_size, split batch
+        if self.buffered_rows + batch.num_rows() > self.max_row_group_size {
+            let to_write = self.max_row_group_size - self.buffered_rows;
+            let a = batch.slice(0, to_write);
+            let b = batch.slice(to_write, batch.num_rows() - to_write);
+            self.write(&a)?;
+            return self.write(&b);
         }
 
         self.buffered_rows += batch.num_rows();
-        self.flush_completed()?;
+        let in_progress = match &mut self.in_progress {
+            Some(in_progress) => in_progress,
+            x => x.insert(ArrowRowGroupWriter::new(
+                self.writer.schema_descr(),
+                self.writer.properties(),
+                &self.arrow_schema,
+            )?),
+        };
 
-        Ok(())
-    }
+        in_progress.write(batch)?;
 
-    /// Flushes buffered data until there are less than `max_row_group_size` rows buffered
-    fn flush_completed(&mut self) -> Result<()> {
-        while self.buffered_rows >= self.max_row_group_size {
-            self.flush_rows(self.max_row_group_size)?;
+        if self.buffered_rows >= self.max_row_group_size {
+            self.flush()?
         }
         Ok(())
     }
 
     /// Flushes all buffered rows into a new row group
     pub fn flush(&mut self) -> Result<()> {
-        self.flush_rows(self.buffered_rows)
+        let in_progress = match self.in_progress.take() {
+            Some(in_progress) => in_progress,
+            None => return Ok(()),
+        };
+
+        self.buffered_rows = 0;

Review Comment:
   Can `self.buffered_rows` get out of sync here (like can it be left at zero if, for example, `next_row_group()` returns and error?



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -471,51 +549,45 @@ fn write_leaf(
                 get_bool_array_slice(array, indices).as_slice(),
                 levels.def_levels(),
                 levels.rep_levels(),
-            )?
+            )
         }
         ColumnWriter::Int64ColumnWriter(ref mut typed) => {
             match column.data_type() {
                 ArrowDataType::Int64 => {
                     let array = column.as_primitive::<Int64Type>();
-                    write_primitive(typed, array.values(), levels)?
+                    write_primitive(typed, array.values(), levels)
                 }
                 ArrowDataType::UInt64 => {
                     let values = column.as_primitive::<UInt64Type>().values();
                     // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
                     // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
                     let array = values.inner().typed_data::<i64>();
-                    write_primitive(typed, array, levels)?
+                    write_primitive(typed, array, levels)
                 }
                 ArrowDataType::Decimal128(_, _) => {
                     // use the int64 to represent the decimal with low precision
                     let array = column
                         .as_primitive::<Decimal128Type>()
-                        .unary::<_, types::Int64Type>(|v| v as i64);
-                    write_primitive(typed, array.values(), levels)?
+                        .unary::<_, Int64Type>(|v| v as i64);
+                    write_primitive(typed, array.values(), levels)
                 }
                 _ => {
                     let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
                     let array = array.as_primitive::<Int64Type>();
-                    write_primitive(typed, array.values(), levels)?
+                    write_primitive(typed, array.values(), levels)
                 }
             }
         }
         ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
             unreachable!("Currently unreachable because data type not supported")
         }
         ColumnWriter::FloatColumnWriter(ref mut typed) => {
-            let array = column
-                .as_any()
-                .downcast_ref::<arrow_array::Float32Array>()
-                .expect("Unable to get Float32 array");
-            write_primitive(typed, array.values(), levels)?
+            let array = column.as_primitive::<Float32Type>();

Review Comment:
   ๐Ÿงน  nice and tidy



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -284,156 +247,271 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
     }
 }
 
-fn write_leaves<W: Write>(
-    row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
-    arrays: &[ArrayRef],
-    levels: &mut [Vec<LevelInfo>],
-) -> Result<()> {
-    assert_eq!(arrays.len(), levels.len());
-    assert!(!arrays.is_empty());
-
-    let data_type = arrays.first().unwrap().data_type().clone();
-    assert!(arrays.iter().all(|a| a.data_type() == &data_type));
-
-    match &data_type {
-        ArrowDataType::Null
-        | ArrowDataType::Boolean
-        | ArrowDataType::Int8
-        | ArrowDataType::Int16
-        | ArrowDataType::Int32
-        | ArrowDataType::Int64
-        | ArrowDataType::UInt8
-        | ArrowDataType::UInt16
-        | ArrowDataType::UInt32
-        | ArrowDataType::UInt64
-        | ArrowDataType::Float32
-        | ArrowDataType::Float64
-        | ArrowDataType::Timestamp(_, _)
-        | ArrowDataType::Date32
-        | ArrowDataType::Date64
-        | ArrowDataType::Time32(_)
-        | ArrowDataType::Time64(_)
-        | ArrowDataType::Duration(_)
-        | ArrowDataType::Interval(_)
-        | ArrowDataType::Decimal128(_, _)
-        | ArrowDataType::Decimal256(_, _)
-        | ArrowDataType::FixedSizeBinary(_) => {
-            let mut col_writer = row_group_writer.next_column()?.unwrap();
-            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
+/// A list of [`Bytes`] comprising a single column chunk
+#[derive(Default)]
+struct ArrowColumnChunk {
+    length: usize,
+    data: Vec<Bytes>,
+}
+
+impl Length for ArrowColumnChunk {
+    fn len(&self) -> u64 {
+        self.length as _
+    }
+}
+
+impl ChunkReader for ArrowColumnChunk {
+    type T = ChainReader;
+
+    fn get_read(&self, start: u64) -> Result<Self::T> {
+        assert_eq!(start, 0);

Review Comment:
   Maybe you can add a `//` comment here explaining what this is relying on, in case anyone in the future hits this assert



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {
+        match &self.in_progress {
+            Some(in_progress) => in_progress
+                .writers
+                .iter()
+                .map(|(x, _)| x.lock().unwrap().length)
+                .sum(),
+            None => 0,
+        }
+    }
+
+    /// Encodes the provided [`RecordBatch`]
     ///
-    /// If following this there are more than `max_row_group_size` rows buffered,
-    /// this will flush out one or more row groups with `max_row_group_size` rows,
-    /// and drop any fully written `RecordBatch`
+    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
+    /// rows, the contents of `batch` will be distributed across multiple row groups such that all
+    /// but the final row group in the file contain [`WriterProperties::max_row_group_size`] rows
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
-        // validate batch schema against writer's supplied schema
-        let batch_schema = batch.schema();
-        if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
-            || self.arrow_schema.contains(&batch_schema))
-        {
-            return Err(ParquetError::ArrowError(
-                "Record batch schema does not match writer schema".to_string(),
-            ));
+        if batch.num_rows() == 0 {
+            return Ok(());
         }
 
-        for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
-            buffer.push_back(column.clone())
+        // If would exceed max_row_group_size, split batch
+        if self.buffered_rows + batch.num_rows() > self.max_row_group_size {
+            let to_write = self.max_row_group_size - self.buffered_rows;
+            let a = batch.slice(0, to_write);
+            let b = batch.slice(to_write, batch.num_rows() - to_write);
+            self.write(&a)?;
+            return self.write(&b);
         }
 
         self.buffered_rows += batch.num_rows();
-        self.flush_completed()?;
+        let in_progress = match &mut self.in_progress {
+            Some(in_progress) => in_progress,
+            x => x.insert(ArrowRowGroupWriter::new(

Review Comment:
   TIL `insert` ๐Ÿ‘  very nice



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {

Review Comment:
   Alternately, perhaps the code would be simpler if you removed `self.buffered_rows` and calculated it based on `in_progress`?
   
   Perhaps something like ๐Ÿค” 
   
   ```rust
       pub fn buffered_rows(&self) -> usize {
         self.in_progress.map(|p| p.len()).unwrap_or(0)
       }
   ```



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1512,7 +1528,7 @@ mod tests {
             metadata.encodings(),
             &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
         );
-        assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
+        assert_eq!(metadata.num_values(), 4);

Review Comment:
   I am hesitant to include this change in this PR -- would you be willing to restore the old behavior in this PR and then make a follow on PR that proposes fixing the behavior?
   
   It would be nice to be able to keep the concerns (bug fix and buffer reduction) separate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org