You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/09/23 12:13:03 UTC

[GitHub] [arrow-datafusion] devinjdangelo opened a new pull request, #7632: Support All Statistics and Enable Backpressure in Parallel Parquet Writer

devinjdangelo opened a new pull request, #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632

   This PR is Draft since it cannot compile without arrow-rs changes: #here 
   
   ## Which issue does this PR close?
   
   Closes #7591 
   Closes #7589
   Related to https://github.com/apache/arrow-rs/issues/1718
   
   ## Rationale for this change
   
   Parallel parquet serialization process implemented in #7562 did not support all parquet metadata (indexes/bloom filters) and had no backpressure on serialization tasks. This PR aims to address these two deficiencies. 
   
   ## What changes are included in this PR?
   
   - Parallel parquet serialization tasks now use `ArrowRowGroupWriter` directly rather than `ArrowWriter`
   - Upstream `arrow-rs` changes filed to make `ArrowRowGroupWriter` public and `Send` so it can be used across an `.await`
   - Parquet serialization tasks can be throttled via a bounded channel mechanism.
   
   ## Benchmarking Results
   
   The results show the parallel parquet process in this PR is ~10% faster than the previous in addition to supporting statistics/bloom filters. Channel_limit=N means that the maximum number of parallel parquet serialization tasks running at one time is set to N. Surprisingly setting this number low can actually increase peak memory usage, which is a surprising result.
   
   See #7562 for benchmarking script used. 
   
   ### Test 1, All 16 Columns, ~3.6GB Parquet File (release build)
   
   #### Execution Time (s)
   Parallelism | Main (single_file_output=false) | Main (single_file_output=true) | This PR (single_file_output=true, channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 22.48 | 22.53 | 21.04 | 22.17
   4 | 12.24 | 14.4 | 12.49 | 12.73
   8 | 10.79 | 12.37 | 10.7 | 11.03
   16 | 10.52 | 12.67 | 10.78 | 10.85
   32 | 10.91 | 12.07 | 10.31 | 10.25
   64 | 10.21 | 12.97 | 12.34 | 11.62
   
   #### Peak Memory Usage (MB)
   Parallelism | Main (single_file_output=false) | Main (single_file_output=true) | This PR (single_file_output=true, channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 1753.3 | 1758 | 1757.1 | 1760
   4 | 2445.4 | 7104 | 5690.7 | 5684.2
   8 | 3387 | 7623.1 | 6642 | 7804
   16 | 5047.6 | 8262.6 | 8151 | 10437.7
   32 | 7683.6 | 7672.6 | 9358 | 11657.5
   64 | 10961.1 | 10370.2 | 10388 | 12898
   
   ### Test 2, Subset of 3 Columns, ~895MB Parquet File (release build)
   
   #### Execution Time (s)
   Parallelism | Main (single_file_output=false) | Main (single_file_output=true) | This PR (single_file_output=true, channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 3.57 | 3.15 | 3.38 | 3.39
   4 | 1.78 | 2.37 | 2.53 | 2.3
   8 | 1.45 | 2.07 | 2.58 | 2.23
   16 | 1.54 | 2.09 | 2.06 | 1.71
   32 | 1.7 | 2.1 | 2.08 | 1.65
   64 | 1.89 | 2.72 | 2.76 | 2.07
   
   #### Peak Memory Consumption (MB)
   Parallelism | Main (single_file_output=false) | Main (single_file_output=true) | This PR (single_file_output=true, channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 450.6 | 451.6 | 448.9 | 449.4
   4 | 584.5 | 1659.1 | 1257.0 | 1284.4
   8 | 759.4 | 1939.7 | 1225.7 | 1339.2
   16 | 1045.8 | 2051.2 | 1359.8 | 1438.5
   32 | 1564.6 | 1899.7 | 1445.5 | 1545.3
   64 | 2318.8 | 1726.1 | 1732.0 | 1735.3
   
   ## Are these changes tested?
   
   Yes, by existing tests and adhoc benchmarking
   
   ## Are there any user-facing changes?
   
   No
   
   ## Next Steps
   
   So far, parquet serialization is only being parallelized in terms of RowGroups. This means we are limited in terms of parallelization based on the number of RowGroups we want in our file, which can be as low as 1 in general. Parquet files generally have a large number of columns and we could parallelize at the column level in addition to speed up more.
   
   We could also break free of the Parallelism=RowGroupNumber limit if it were possible to concatenate (`ArrowColumnChunk`, `ColumnCloseResult`) tuples together before writing them into a RowGroup. This might not be possible efficiently, since `ArrowColumnChunk`'s are already compressed. Aggregating column min/max statistics would be trivial, but distinct values + bloom filters would not be trivial.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #7632: Support All Statistics and Enable Backpressure in Parallel Parquet Writer

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632#discussion_r1336307445


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+    Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
 
 /// Parallelizes the serialization of a single parquet file, by first serializing N
 /// independent RecordBatch streams in parallel to parquet files in memory. Another
 /// task then stitches these independent files back together and streams this large
 /// single parquet file to an ObjectStore in multiple parts.
 async fn output_single_parquet_file_parallelized(
     mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
+    data: Vec<SendableRecordBatchStream>,
     output_schema: Arc<Schema>,
     parquet_props: &WriterProperties,
 ) -> Result<usize> {
     let mut row_count = 0;
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
+    let (serialize_tx, mut serialize_rx) =
+        mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(100);
+
+    // Create some Arc<> copies that we can move into launch_serialization task and still access later
+    let arc_props = Arc::new(parquet_props.clone());
+    let arc_props_clone = arc_props.clone();
+    let schema_clone = output_schema.clone();
+    let launch_serialization_task: JoinHandle<Result<(), DataFusionError>> =

Review Comment:
   It probably doesn't matter but if something goes wrong and `output_single_parquet_file_parallelized` returns an error, this code may well still launch tasks and try to buffer / serialize the streams.
   
   I think this could be avoided if we put all the handles into a `JoinSet` so when they were dropped all the tasks would be canceled: https://docs.rs/tokio/1.32.0/tokio/task/struct.JoinSet.html



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+    Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
 
 /// Parallelizes the serialization of a single parquet file, by first serializing N
 /// independent RecordBatch streams in parallel to parquet files in memory. Another
 /// task then stitches these independent files back together and streams this large
 /// single parquet file to an ObjectStore in multiple parts.
 async fn output_single_parquet_file_parallelized(
     mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
+    data: Vec<SendableRecordBatchStream>,
     output_schema: Arc<Schema>,
     parquet_props: &WriterProperties,
 ) -> Result<usize> {
     let mut row_count = 0;
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
+    let (serialize_tx, mut serialize_rx) =
+        mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(100);
+
+    // Create some Arc<> copies that we can move into launch_serialization task and still access later
+    let arc_props = Arc::new(parquet_props.clone());
+    let arc_props_clone = arc_props.clone();
+    let schema_clone = output_schema.clone();
+    let launch_serialization_task: JoinHandle<Result<(), DataFusionError>> =
+        tokio::spawn(async move {
+            for mut stream in data {
+                let schema_desc = arrow_to_parquet_schema(&schema_clone)?;
+                let mut writer = ArrowRowGroupWriter::new(
+                    &schema_desc,
+                    &arc_props_clone,
+                    &schema_clone,
+                )?;
+                serialize_tx
+                    .send(tokio::spawn(async move {
+                        let mut inner_row_count = 0;
+                        while let Some(rb) = stream.next().await.transpose()? {
+                            inner_row_count += rb.num_rows();
+                            writer.write(&rb)?;
+                        }
+                        Ok((writer.close()?, inner_row_count))
+                    }))
+                    .await
+                    .map_err(|_| {

Review Comment:
   I think the only way the send will fail is if the receiver was dropped -- either due to early plan cancel or some other error
   
   Thus it might make sense to ignore the error and return (with a comment about rationale) rather than returning an error



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+    Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
 
 /// Parallelizes the serialization of a single parquet file, by first serializing N
 /// independent RecordBatch streams in parallel to parquet files in memory. Another
 /// task then stitches these independent files back together and streams this large
 /// single parquet file to an ObjectStore in multiple parts.
 async fn output_single_parquet_file_parallelized(
     mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
+    data: Vec<SendableRecordBatchStream>,
     output_schema: Arc<Schema>,
     parquet_props: &WriterProperties,
 ) -> Result<usize> {
     let mut row_count = 0;
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
+    let (serialize_tx, mut serialize_rx) =

Review Comment:
   my reading of this suggests it allows up to 100 row groups to be created in parallel, which likely results in more buffering than necessary.
   
   Rather than formulating this as a `mspc::channel` it would be really neat to see it formulated as a `Stream<(ArrowColumnChunk, ColumnCloseResult)>`. 
   
   then, in combination with [`StreamExt::buffered()`](https://docs.rs/futures/0.3.28/futures/stream/trait.StreamExt.html#method.buffered) we could control the parallelism at the row group level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] devinjdangelo commented on a diff in pull request #7632: Support All Statistics and Enable Backpressure in Parallel Parquet Writer

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632#discussion_r1336484162


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+    Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
 
 /// Parallelizes the serialization of a single parquet file, by first serializing N
 /// independent RecordBatch streams in parallel to parquet files in memory. Another
 /// task then stitches these independent files back together and streams this large
 /// single parquet file to an ObjectStore in multiple parts.
 async fn output_single_parquet_file_parallelized(
     mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
+    data: Vec<SendableRecordBatchStream>,
     output_schema: Arc<Schema>,
     parquet_props: &WriterProperties,
 ) -> Result<usize> {
     let mut row_count = 0;
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
+    let (serialize_tx, mut serialize_rx) =

Review Comment:
   I am using `mpsc::channel` even more extensively in the new implementation (#7655).  I have experimented with `StreamExt::buffered()` in the past, but it did not seem to leverage multiple CPU cores, whereas spawning tokio tasks communicating across  a channel did. 
   
   I can revisit this though as it could simplify the code, and I may have just messed something up last time I tried it :thinking: .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] devinjdangelo commented on pull request #7632: Support All Statistics and Enable Backpressure in Parallel Parquet Writer

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on PR #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632#issuecomment-1734613769

   @alamb Thank you for the review! I have significantly reworked this PR in a new PR #7655, relying primarily on column wise parallelization rather than row group wise. The new approach is much more complex in the code, but the performance advantage is huge. 20% faster and 90% lower memory overhead vs. this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support All Statistics and Enable Backpressure in Parallel Parquet Writer [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed pull request #7632: Support All Statistics and Enable Backpressure in Parallel Parquet Writer
URL: https://github.com/apache/arrow-datafusion/pull/7632


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org