You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/01 13:39:02 UTC

[GitHub] [arrow-rs] tustvold commented on issue #1473: Requirements for Async Parquet API

tustvold commented on issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473#issuecomment-1085914732


   Ok so an update on this. I implemented this proposal in #1509, along with a number of modifications to help improve its performance - e.g. avoid copying buffers, etc... I then created an updated DataFusion branch to make use of [this](https://github.com/tustvold/arrow-datafusion/tree/parquet-async-wip). Unfortunately the performance was still significantly worse than master :cry:
   
   ## Scheduler Bench
   
   The DataFusion compile times were starting to grate, and there were far too many variables, so I created [scheduler-bench](https://github.com/tustvold/scheduler-bench) which is a reduced version of one of the queries that was exhibiting problematic runtime characteristics in the DataFusion benchmark.
   
   Running this we get the following
   
   ```
   tokio_sync_file_test (2048): min: 0.0534s, max: 0.0660s, avg: 0.0548s, p95: 0.0557s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0484s, max: 0.0813s, avg: 0.0585s, p95: 0.0754s
   tokio_par_async_blocking_test (2048): min: 0.0563s, max: 0.0721s, avg: 0.0584s, p95: 0.0600s
   tokio_par_sync_test (2048): min: 0.0536s, max: 0.0563s, avg: 0.0546s, p95: 0.0554s
   ```
   
   `tokio_sync_file_test` runs a spawn_blocking task that decodes and sends RecordBatch over an mpsc channel, a separate tokio task then performs the rest of the query. `tokio_par_sync_test` is broadly similar, but uses a regular tokio task instead of spawn_blocking for the reader.
   
   `tokio_par_async_spawn_blocking_test` and `tokio_par_async_blocking_test` use a tokio task to run the async `ParquetRecordBatchStream`, which communicates over an mpsc channel with a separate tokio task that performs the rest of the query. The difference is `tokio_par_async_spawn_blocking` uses a separate spawn_blocking to read the column chunk data into the in-memory buffers, whereas the other cheekily just uses blocking IO in a tokio worker thread.
   
   Immediately there is an obvious disparity between `tokio_par_async_spawn_blocking_test` and the other methods. Initially I thought this was overheads of spawn_blocking, and in earlier incarnations it certainly was, but with the switch to the `Storage` interface there is now only a single spawn_blocking per row group (of which there are 2 in this file).
   
   I therefore added a load of instrumentation and noticed something strange, the performance variance in `tokio_par_async_spawn_blocking_test` is entirely in the synchronous section that decodes RecordBatch from the already read byte arrays. This immediately suggested some sort of socket-locality funkiness so I ran the test restricted to a single CPU core.
   
   ```
   $  taskset 0x1 cargo run --release
   tokio_sync_file_test (2048): min: 0.0593s, max: 0.0609s, avg: 0.0600s, p95: 0.0605s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0619s, max: 0.0653s, avg: 0.0627s, p95: 0.0633s
   tokio_par_async_blocking_test (2048): min: 0.0619s, max: 0.0635s, avg: 0.0627s, p95: 0.0632s
   tokio_par_sync_test (2048): min: 0.0588s, max: 0.0609s, avg: 0.0595s, p95: 0.0600s
   ```
   
   Similarly restricting the tokio worker pool to contain a single worker thread (note spawn_blocking will still use a separate thread)
   
   ```
   tokio_sync_file_test (2048): min: 0.0495s, max: 0.0510s, avg: 0.0499s, p95: 0.0503s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0469s, max: 0.0618s, avg: 0.0491s, p95: 0.0495s
   tokio_par_async_blocking_test (2048): min: 0.0522s, max: 0.0534s, avg: 0.0526s, p95: 0.0531s
   tokio_par_sync_test (2048): min: 0.0497s, max: 0.0507s, avg: 0.0501s, p95: 0.0505s
   ```
   
   The performance across the board is actually better than when the worker pool had more threads, and the performance disparity between the approaches is largely eliminated.
   
   Whilst `perf` has to be taken with a grain of salt, as it doesn't properly understand tokio's userland threading, I captured a profile of a run that first performed `tokio_par_async_spawn_blocking_test` and then `tokio_par_sync_test`.
   
   ![image](https://user-images.githubusercontent.com/1781103/161268498-fff27578-4ef4-4570-b387-7fe77593c96f.png)
   
   We can clearly see work ping-ponging between threads for `tokio_par_async_spawn_blocking_test` and transitioning to a significantly more stable pattern for `tokio_par_sync_test`, which perhaps unsurprisingly performs better.
   
   Removing the second tokio task also results in the same improvements to workload variability and therefore average runtime performance.
   
   ```
   tokio_async_spawn_blocking_test (2048): min: 0.0557s, max: 0.0587s, avg: 0.0565s, p95: 0.0582s
   tokio_async_blocking_test (2048): min: 0.0576s, max: 0.0599s, avg: 0.0584s, p95: 0.0593s
   ```
   
   ## Manual Threading
   
   So it certainly looks like tokio is not scheduling our tasks well, but perhaps there is something else at play. I therefore experimented without using tokio and manually threading the workload
   
   Here we can see the performance of a single-threaded execution against a file, and against data already in memory 
   
   ```
   sync_file_test (2048): min: 0.0541s, max: 0.0758s, avg: 0.0548s, p95: 0.0550s
   sync_mem_test (2048): min: 0.0585s, max: 0.0616s, avg: 0.0598s, p95: 0.0609s
   ```
   
   And for completeness the performance of a blocking implementation using two threads
   
   ```
   par_sync_file_test (2048): min: 0.0542s, max: 0.0584s, avg: 0.0563s, p95: 0.0572s
   ```
   
   ## Conclusions
   
   I think this data is consistent with the following conclusions:
   
   * The performance of this query is largely dominated by the CPU-bound task of decoding the parquet bytes
   * There are overheads associated with buffering up parquet data ahead of time, on the order of ~5ms
   * There are CPU-locality effects on the order of ~30ms that impact the performance of decoding the parquet bytes
   
   I will come back to this next week with fresh eyes, but if the above is correct it would have the following implications:
   
   * The advantages of a fully async parquet implementation are likely to be undermined by the performance cost associated with the loss of thread-locality
   * Using tokio to schedule CPU-bound work within DataFusion is likely significantly more sub-optimal than we anticipated, especially for stateful operators
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org