You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Weston Pace (Jira)" <ji...@apache.org> on 2021/12/03 03:00:00 UTC

[jira] [Commented] (ARROW-14965) [Python][C++] Contention when reading Parquet files with multi-threading

    [ https://issues.apache.org/jira/browse/ARROW-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452707#comment-17452707 ] 

Weston Pace commented on ARROW-14965:
-------------------------------------

> It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283

This is a fast operation.  I see no reason to release the GIL here.

> XYZ seems to be contended

Arrow has its own internal thread pool.  Typically what happens is thread tasks are added to the thread pool and the calling thread is blocked until all the necessary tasks are finished.  So this contention is expected.  As long as we aren't holding the GIL this should be relatively harmless.

> Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at most 2 seconds. However, with a ThreadPoolExecutor some of the read_file calls take 20+ seconds.
...
> Contention again mostly for RecordBatchReader.read_all, but seems to complete in ~12 seconds rather than 20

Are you saying that all 13 read_file calls run in parallel and complete in 2 seconds?  This is an issue if ProcessPoolExecutor can read all 13 files in 2 seconds but ThreadPoolExecutor requires 12 seconds.

However, there could certainly be reasons for it.  For example, ARROW-14974.  Using ThreadPoolExecutor means you are sharing a single CPU thread pool.  Using ProcessPoolExecutor means each process will have its own CPU thread pool.  In theory this shouldn't be a problem, we defer slow I/O tasks to the I/O thread pool and so we should only put compute tasks on the CPU thread pool.  Since the CPU thread pool is the same size as the # of compute units on the system there wouldn't be much advantage to having multiple CPU thread pools (e.g. it doesn't matter if I number crunch on 10 threads or 20 threads if I only have 8 cores).  In practice we could certainly have mistakes.

I tried to run a number of experiments myself.  I created 13 parquet files, each 12MB.  I tried reading them with a ThreadPoolExecutor, a ProcessPoolExecutor, and a dataset.  I didn't use a filter or limit the row groups (I'll experiment with these later) but just read in the entire dataset.

With cold-I/O I had a lot of variability and the three approaches performed more or less the same (ProcessPoolExecutor seemed a bit slower but I didn't run enough experiments to verify).

With hot-I/O the ProcessPoolExecutor performed much worse than the other two approaches (which performed similarly).

So, basically, I am not reproducing the same behavior yet.  I will try adding a filter.



> [Python][C++] Contention when reading Parquet files with multi-threading
> ------------------------------------------------------------------------
>
>                 Key: ARROW-14965
>                 URL: https://issues.apache.org/jira/browse/ARROW-14965
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++, Python
>    Affects Versions: 6.0.0
>            Reporter: Nick Gates
>            Priority: Minor
>
> I'm attempting to read a table from multiple Parquet files where I already know which row_groups I want to read from each file. I also want to apply a filter expression while reading. To do this my code looks roughly like this:
>  
> {code:java}
> def read_file(filepath):
>     format = ds.ParquetFileFormat(...)
>     fragment = format.make_fragment(filepath, row_groups=[0, 1, 2, ...])
>     scanner = ds.Scanner.from_fragment(
>         fragment, 
>         use_threads=True,
>         use_async=False,
>         filter=...
>     )
>     return scanner.to_reader().read_all()
> with ThreadPoolExecutor() as pool:
>     pa.concat_tables(pool.map(read_file, file_paths)) {code}
> Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at most 2 seconds. However, with a ThreadPoolExecutor some of the read_file calls take 20+ seconds.
>  
> I've tried running this with various combinations of use_threads and use_async to try and see what's happening. The code blocks are sourced from py-spy, and identifying contention was done with viztracer.
>  
> *use_threads: False, use_async: False*
>  * It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: [https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283]
>  * pyarrow._dataset.from_fragment seems to be contended. Py-spy suggests this is around getting the physical_schema from the fragment?
>  
> {code:java}
> from_fragment (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
> __pyx_getprop_7pyarrow_8_dataset_8Fragment_physical_schema (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
> __pthread_cond_timedwait (libpthread-2.17.so) {code}
>  
> *use_threads: False, use_async: True*
>  * There's no longer any contention for pyarrow._dataset.from_fragment
>  * But there's lots of contention for pyarrow.lib.RecordBatchReader.read_all
>  
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
> arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) 
> std::condition_variable::wait (libstdc++.so.6.0.19){code}
> *use_threads: True, use_async: False*
>  * Appears to be some contention on Scanner.to_reader
>  * But most contention remains for RecordBatchReader.read_all
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::FunctionIterator<arrow::dataset::(anonymous namespace)::SyncScanner::ScanBatches(arrow::Iterator<std::shared_ptr<arrow::dataset::ScanTask> >)::{lambda()#1}, arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
> std::condition_variable::wait (libstdc++.so.6.0.19)
> __pthread_cond_wait (libpthread-2.17.so) {code}
> *use_threads: True, use_async: True*
>  * Contention again mostly for RecordBatchReader.read_all, but seems to complete in ~12 seconds rather than 20
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
> arrow::FutureImpl::Wait (pyarrow/libarrow.so.600)
> std::condition_variable::wait (libstdc++.so.6.0.19)
> __pthread_cond_wait (libpthread-2.17.so) {code}
> Is this expected behaviour? Or should it be possible to achieve the same performance from multi-threading as from multi-processing?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)