You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/09/28 23:01:14 UTC

[GitHub] [arrow-rs] devinjdangelo commented on a diff in pull request #4871: Support Encoding Parquet Columns in Parallel

devinjdangelo commented on code in PR #4871:
URL: https://github.com/apache/arrow-rs/pull/4871#discussion_r1340734930


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -347,92 +349,213 @@ impl PageWriter for ArrowPageWriter {
     }
 }
 
-/// Encodes a leaf column to [`ArrowPageWriter`]
-enum ArrowColumnWriter {
+/// A leaf column that can be encoded by [`ArrowColumnWriter`]
+pub struct ArrowLeafColumn(ArrayLevels);
+
+/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
+pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
+    let levels = calculate_array_levels(array, field)?;
+    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
+}
+
+/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
+///
+/// Note: This is a low-level interface for applications that require fine-grained control
+/// of encoding, see [`ArrowWriter`] for a higher-level interface
+///
+/// ```
+/// // The arrow schema
+/// # use std::sync::Arc;
+/// # use arrow_array::*;
+/// # use arrow_schema::*;
+/// # use parquet::arrow::arrow_to_parquet_schema;
+/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
+/// # use parquet::file::properties::WriterProperties;
+/// # use parquet::file::writer::SerializedFileWriter;
+/// #
+/// let schema = Arc::new(Schema::new(vec![
+///     Field::new("i32", DataType::Int32, false),
+///     Field::new("f32", DataType::Float32, false),
+/// ]));
+///
+/// // Compute the parquet schema
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
+/// let props = Arc::new(WriterProperties::default());
+///
+/// // Create writers for each of the leaf columns
+/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
+///
+/// // Spawn a worker thread for each column
+/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
+/// let mut workers: Vec<_> = col_writers
+///     .into_iter()
+///     .map(|mut col_writer| {
+///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
+///         let handle = std::thread::spawn(move || {
+///             for col in recv {
+///                 col_writer.write(&col)?;
+///             }
+///             col_writer.close()
+///         });
+///         (handle, send)
+///     })
+///     .collect();
+///
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// let mut out = Vec::with_capacity(1024); // This could be a File
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
+///
+/// // Start row group
+/// let mut row_group = writer.next_row_group().unwrap();
+///
+/// // Columns to encode
+/// let to_write = vec![
+///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
+///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
+/// ];
+///
+/// // Spawn work to encode columns
+/// let mut worker_iter = workers.iter_mut();
+/// for (a, f) in to_write.iter().zip(&schema.fields) {
+///     for c in compute_leaves(f, a).unwrap() {
+///         worker_iter.next().unwrap().1.send(c).unwrap();
+///     }
+/// }
+///
+/// // Finish up parallel column encoding
+/// for (handle, send) in workers {
+///     drop(send); // Drop send side to signal termination
+///     let (chunk, result) = handle.join().unwrap().unwrap();
+///     row_group.append_column(&chunk, result).unwrap();
+/// }
+/// row_group.close().unwrap();
+///
+/// let metadata = writer.close().unwrap();
+/// assert_eq!(metadata.num_rows, 3);
+/// ```
+pub struct ArrowColumnWriter {
+    writer: ArrowColumnWriterImpl,
+    chunk: SharedColumnChunk,
+}
+
+enum ArrowColumnWriterImpl {
     ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
     Column(ColumnWriter<'static>),
 }
 
 impl ArrowColumnWriter {
+    /// Write an [`ArrowLeafColumn`]
+    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
+        match &mut self.writer {
+            ArrowColumnWriterImpl::Column(c) => {
+                write_leaf(c, &col.0)?;
+            }
+            ArrowColumnWriterImpl::ByteArray(c) => {
+                write_primitive(c, col.0.array().as_ref(), &col.0)?;
+            }
+        }
+        Ok(())
+    }
+
+    /// Close this column returning the [`ArrowColumnChunk`] and [`ColumnCloseResult`]
+    pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> {

Review Comment:
   Perhaps there could be a light wrapper around append_column that accepts `EncodedRowGroup` as @alamb defined it?
   
   It isn't a big deal, but I agree with @alamb that it is confusing at first since the consumer of this API does not really need to concern themself with either of these two structs individually. They are more internal implementation details of arrow-rs. `EncodedRowGroup` would be more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org