You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/22 19:07:15 UTC

[GitHub] [arrow-rs] tustvold opened a new issue #1473: Requirements for Async Parquet API

tustvold opened a new issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473


   **Background**
   
   In #1154 I added an `async` parquet API in the form of  `ParquetRecordBatchStream`. This was maximally async, that is it made use of tokio's async IO traits to be as generic as possible. However, having experimented with this I'm not sure that this design is quite right. 
   
   In particular https://github.com/apache/arrow-datafusion/pull/1617 showed non-trivial performance regressions operating on local files. This is caused by three major factors:
   
   * Additional copying of buffers in tokio and the ParquetRecordBatchStream necessary to convert an async Read to a sync Read
   * Less parallelism due to parquet decode taking place in a separate blocking thread on master
   * Overheads due to `tokio::fs::File` calling `spawn_blocking` for every IO operation
   
   This last point is pretty important and touches on something I was not aware of, tokio does not use an IO reactor for file IO like say boost::asio, instead it just calls `tokio::task::spawn_blocking` for every IO call. This somewhat undermines the concept of async file IO, as all you're doing is moving where the `tokio::task::spawn_blocking` is called, and in fact you're moving it lower in the call chain where its overheads are less amortized.
   
   As part of further exploring this design space I created #1472 which instead of using the tokio IO traits, uses the non-async `ChunkReader` trait and `tokio::task::spawn_blocking`. Effectively this just upstreams logic from DataFusion's ParquetExec operator, and so perhaps unsurprisingly does not represent a performance regression.
   
   This is still technically an `async` API, however, I am aware that a number of people expressed interest in an `async` version of `ChunkReader` which suggests they want lower-level async-ness. It is also unclear that `ChunkReader` is quite right either - see #1163 and https://github.com/apache/arrow-datafusion/pull/1905.
   
   To further complicate matters, differing storage media have different trade-offs, in particular when fetching from local disk or memory it may make sense to perform the most granular reads possible, potentially filtering out individual pages, columns, etc... However, when fetching data from object storage this is less clear cut. As each request costs and comes with non-trivial latency, there is likely a desire to coalesce proximate byte ranges into a single request, even if this results in reading more data then needed. As a result there is likely no general-purpose strategy for fetching data, and we therefore need the flexibility to allow this to be customized downstream.
   
   Finally, there is ongoing effort to introduce more parallelism into the parquet scan - https://github.com/apache/arrow-datafusion/pull/1990, and whilst async is a concurrency primitive and not a parallelism primitive, the two concepts are closely related in practice. 
   
   **Requirements**
   
   I think the requirements are therefore as follows
   
   1. Provide an `async` API that yields a stream of `Result<RecordBatch>`
   2. Use predicate and projection pushdown to filter the data to scan
   3. Separate identifying the byte ranges of column data to scan, from actually performing the scan
   4. Delegate fetching the corresponding byte ranges to an `async` trait, allowing downstream customisation of the fetch strategy
   5. Avoid copying the page data between buffers
   7. Avoid calling spawn_blocking where the read implementation will not block (e.g. already in-memory)
   8. Be extensible to support parallel column decoding (#TBD)
   9. Be extensible to support more advanced predicate pushdown (#1191)
   
   **Proposal**
   
   An intentionally vague proposal would be to extend https://github.com/apache/arrow-datafusion/pull/1617 replacing the use of `ChunkReader` with a `Storage` trait that might look something like
   
   ```
   #[async_trait]
   pub trait Storage {
     async fn prefetch(&mut self, ranges: Vec<std::ops::Range<usize>>) -> Result<()>,
   
     async fn read(&mut self, range: std::ops::Range<usize>) -> Result<ByteBufferPtr>
   }
   ```
   
   `ParquetRecordBatchStreamBuilder` would use this trait to first read the footer, and then as part of `build()` invoke `prefetch()` with the determined byte ranges to scan. Finally `ParquetRecordBatchStream` would drive `Storage::read` with the individual column chunk ranges as needed by the stream.
   
   This will likely require some minor alterations to `SerializedPageReader` in order to avoid copying the data returned from `Storage::read` but I think this is worthwhile and will also benefit reading data from in-memory.
   
   FYI @rdettai @yjshen @alamb @sunchao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on issue #1473: Requirements for Async Parquet API

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473#issuecomment-1075537925


   https://github.com/apache/arrow-rs/issues/1474 is likely also related as `ByteBufferPtr` is currently an experimental API and it would be nice to not leak this into the public interface


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on issue #1473: Requirements for Async Parquet API

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473#issuecomment-1075699388


   Thank you for writing this up. I agree with most of it. As a validation of any new API / modification to an existing API, might I suggest we prototype using it (using datafusion) to ensure we have at least one reference implementation?
   
   I am thinking especially that the interaction with the `ObjectStore` interface and `Storage` may be somewhat tricky / non obvious, especially with the `async` nature of it all


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on issue #1473: Requirements for Async Parquet API

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473#issuecomment-1085914732


   Ok so an update on this. I implemented this proposal in #1509, along with a number of modifications to help improve its performance - e.g. avoid copying buffers, etc... I then created an updated DataFusion branch to make use of [this](https://github.com/tustvold/arrow-datafusion/tree/parquet-async-wip). Unfortunately the performance was still significantly worse than master :cry:
   
   ## Scheduler Bench
   
   The DataFusion compile times were starting to grate, and there were far too many variables, so I created [scheduler-bench](https://github.com/tustvold/scheduler-bench) which is a reduced version of one of the queries that was exhibiting problematic runtime characteristics in the DataFusion benchmark.
   
   Running this we get the following
   
   ```
   tokio_sync_file_test (2048): min: 0.0534s, max: 0.0660s, avg: 0.0548s, p95: 0.0557s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0484s, max: 0.0813s, avg: 0.0585s, p95: 0.0754s
   tokio_par_async_blocking_test (2048): min: 0.0563s, max: 0.0721s, avg: 0.0584s, p95: 0.0600s
   tokio_par_sync_test (2048): min: 0.0536s, max: 0.0563s, avg: 0.0546s, p95: 0.0554s
   ```
   
   `tokio_sync_file_test` runs a spawn_blocking task that decodes and sends RecordBatch over an mpsc channel, a separate tokio task then performs the rest of the query. `tokio_par_sync_test` is broadly similar, but uses a regular tokio task instead of spawn_blocking for the reader.
   
   `tokio_par_async_spawn_blocking_test` and `tokio_par_async_blocking_test` use a tokio task to run the async `ParquetRecordBatchStream`, which communicates over an mpsc channel with a separate tokio task that performs the rest of the query. The difference is `tokio_par_async_spawn_blocking` uses a separate spawn_blocking to read the column chunk data into the in-memory buffers, whereas the other cheekily just uses blocking IO in a tokio worker thread.
   
   Immediately there is an obvious disparity between `tokio_par_async_spawn_blocking_test` and the other methods. Initially I thought this was overheads of spawn_blocking, and in earlier incarnations it certainly was, but with the switch to the `Storage` interface there is now only a single spawn_blocking per row group (of which there are 2 in this file).
   
   I therefore added a load of instrumentation and noticed something strange, the performance variance in `tokio_par_async_spawn_blocking_test` is entirely in the synchronous section that decodes RecordBatch from the already read byte arrays. This immediately suggested some sort of socket-locality funkiness so I ran the test restricted to a single CPU core.
   
   ```
   $  taskset 0x1 cargo run --release
   tokio_sync_file_test (2048): min: 0.0593s, max: 0.0609s, avg: 0.0600s, p95: 0.0605s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0619s, max: 0.0653s, avg: 0.0627s, p95: 0.0633s
   tokio_par_async_blocking_test (2048): min: 0.0619s, max: 0.0635s, avg: 0.0627s, p95: 0.0632s
   tokio_par_sync_test (2048): min: 0.0588s, max: 0.0609s, avg: 0.0595s, p95: 0.0600s
   ```
   
   Similarly restricting the tokio worker pool to contain a single worker thread (note spawn_blocking will still use a separate thread)
   
   ```
   tokio_sync_file_test (2048): min: 0.0495s, max: 0.0510s, avg: 0.0499s, p95: 0.0503s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0469s, max: 0.0618s, avg: 0.0491s, p95: 0.0495s
   tokio_par_async_blocking_test (2048): min: 0.0522s, max: 0.0534s, avg: 0.0526s, p95: 0.0531s
   tokio_par_sync_test (2048): min: 0.0497s, max: 0.0507s, avg: 0.0501s, p95: 0.0505s
   ```
   
   The performance across the board is actually better than when the worker pool had more threads, and the performance disparity between the approaches is largely eliminated.
   
   Whilst `perf` has to be taken with a grain of salt, as it doesn't properly understand tokio's userland threading, I captured a profile of a run that first performed `tokio_par_async_spawn_blocking_test` and then `tokio_par_sync_test`.
   
   ![image](https://user-images.githubusercontent.com/1781103/161268498-fff27578-4ef4-4570-b387-7fe77593c96f.png)
   
   We can clearly see work ping-ponging between threads for `tokio_par_async_spawn_blocking_test` and transitioning to a significantly more stable pattern for `tokio_par_sync_test`, which perhaps unsurprisingly performs better.
   
   Removing the second tokio task also results in the same improvements to workload variability and therefore average runtime performance.
   
   ```
   tokio_async_spawn_blocking_test (2048): min: 0.0557s, max: 0.0587s, avg: 0.0565s, p95: 0.0582s
   tokio_async_blocking_test (2048): min: 0.0576s, max: 0.0599s, avg: 0.0584s, p95: 0.0593s
   ```
   
   ## Manual Threading
   
   So it certainly looks like tokio is not scheduling our tasks well, but perhaps there is something else at play. I therefore experimented without using tokio and manually threading the workload
   
   Here we can see the performance of a single-threaded execution against a file, and against data already in memory 
   
   ```
   sync_file_test (2048): min: 0.0541s, max: 0.0758s, avg: 0.0548s, p95: 0.0550s
   sync_mem_test (2048): min: 0.0585s, max: 0.0616s, avg: 0.0598s, p95: 0.0609s
   ```
   
   And for completeness the performance of a blocking implementation using two threads
   
   ```
   par_sync_file_test (2048): min: 0.0542s, max: 0.0584s, avg: 0.0563s, p95: 0.0572s
   ```
   
   ## Conclusions
   
   I think this data is consistent with the following conclusions:
   
   * The performance of this query is largely dominated by the CPU-bound task of decoding the parquet bytes
   * There are overheads associated with buffering up parquet data ahead of time, on the order of ~5ms
   * There are CPU-locality effects on the order of ~30ms that impact the performance of decoding the parquet bytes
   
   I will come back to this next week with fresh eyes, but if the above is correct it would have the following implications:
   
   * The advantages of a fully async parquet implementation are likely to be undermined by the performance cost associated with the loss of thread-locality
   * Using tokio to schedule CPU-bound work within DataFusion is likely significantly more sub-optimal than we anticipated, especially for stateful operators
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yjshen commented on issue #1473: Requirements for Async Parquet API

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473#issuecomment-1086543414


   Thanks @tustvold for the great writeup!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org