You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/05/25 19:05:03 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4280: Improve `ArrowWriter` memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871)

tustvold commented on code in PR #4280:
URL: https://github.com/apache/arrow-rs/pull/4280#discussion_r1205906862


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -284,156 +247,271 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
     }
 }
 
-fn write_leaves<W: Write>(
-    row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
-    arrays: &[ArrayRef],
-    levels: &mut [Vec<LevelInfo>],
-) -> Result<()> {
-    assert_eq!(arrays.len(), levels.len());
-    assert!(!arrays.is_empty());
-
-    let data_type = arrays.first().unwrap().data_type().clone();
-    assert!(arrays.iter().all(|a| a.data_type() == &data_type));
-
-    match &data_type {
-        ArrowDataType::Null
-        | ArrowDataType::Boolean
-        | ArrowDataType::Int8
-        | ArrowDataType::Int16
-        | ArrowDataType::Int32
-        | ArrowDataType::Int64
-        | ArrowDataType::UInt8
-        | ArrowDataType::UInt16
-        | ArrowDataType::UInt32
-        | ArrowDataType::UInt64
-        | ArrowDataType::Float32
-        | ArrowDataType::Float64
-        | ArrowDataType::Timestamp(_, _)
-        | ArrowDataType::Date32
-        | ArrowDataType::Date64
-        | ArrowDataType::Time32(_)
-        | ArrowDataType::Time64(_)
-        | ArrowDataType::Duration(_)
-        | ArrowDataType::Interval(_)
-        | ArrowDataType::Decimal128(_, _)
-        | ArrowDataType::Decimal256(_, _)
-        | ArrowDataType::FixedSizeBinary(_) => {
-            let mut col_writer = row_group_writer.next_column()?.unwrap();
-            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
+/// A list of [`Bytes`] comprising a single column chunk
+#[derive(Default)]
+struct ArrowColumnChunk {
+    length: usize,
+    data: Vec<Bytes>,
+}
+
+impl Length for ArrowColumnChunk {
+    fn len(&self) -> u64 {
+        self.length as _
+    }
+}
+
+impl ChunkReader for ArrowColumnChunk {
+    type T = ChainReader;
+
+    fn get_read(&self, start: u64) -> Result<Self::T> {
+        assert_eq!(start, 0);
+        Ok(ChainReader(self.data.clone().into_iter().peekable()))
+    }
+
+    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
+        unimplemented!()
+    }
+}
+
+/// A [`Read`] for an iterator of [`Bytes`]
+struct ChainReader(Peekable<IntoIter<Bytes>>);
+
+impl Read for ChainReader {
+    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
+        let buffer = loop {
+            match self.0.peek_mut() {
+                Some(b) if b.is_empty() => {
+                    self.0.next();
+                    continue;
+                }
+                Some(b) => break b,
+                None => return Ok(0),
             }
-            col_writer.close()
+        };
+
+        let len = buffer.len().min(out.len());
+        let b = buffer.split_to(len);
+        out[..len].copy_from_slice(&b);
+        Ok(len)
+    }
+}
+
+/// A shared [`ArrowColumnChunk`]
+///
+/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
+/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows

Review Comment:
   I tried a lot to avoid this, however, I couldn't devise any mechanism that wasn't both a breaking change and pretty grim. This was the least bad option, and doesn't commit us to anything long term



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org