You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/09/25 23:48:07 UTC

[GitHub] [arrow-datafusion] devinjdangelo commented on a diff in pull request #7632: Support All Statistics and Enable Backpressure in Parallel Parquet Writer

devinjdangelo commented on code in PR #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632#discussion_r1336484162


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+    Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
 
 /// Parallelizes the serialization of a single parquet file, by first serializing N
 /// independent RecordBatch streams in parallel to parquet files in memory. Another
 /// task then stitches these independent files back together and streams this large
 /// single parquet file to an ObjectStore in multiple parts.
 async fn output_single_parquet_file_parallelized(
     mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
+    data: Vec<SendableRecordBatchStream>,
     output_schema: Arc<Schema>,
     parquet_props: &WriterProperties,
 ) -> Result<usize> {
     let mut row_count = 0;
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
+    let (serialize_tx, mut serialize_rx) =

Review Comment:
   I am using `mpsc::channel` even more extensively in the new implementation (#7655).  I have experimented with `StreamExt::buffered()` in the past, but it did not seem to leverage multiple CPU cores, whereas spawning tokio tasks communicating across  a channel did. 
   
   I can revisit this though as it could simplify the code, and I may have just messed something up last time I tried it :thinking: .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org