You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Nick Gates (Jira)" <ji...@apache.org> on 2021/12/02 13:19:00 UTC

[jira] [Updated] (ARROW-14965) Contention when reading Parquet files with multi-threading

     [ https://issues.apache.org/jira/browse/ARROW-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nick Gates updated ARROW-14965:
-------------------------------
    Description: 
I'm attempting to read a table from multiple Parquet files where I already know which row_groups I want to read from each file. I also want to apply a filter expression while reading. To do this my code looks roughly like this:

 
{code:java}
def read_file(filepath):
    format = ds.ParquetFileFormat(...)
    fragment = format.make_fragment(filepath, row_groups=[0, 1, 2, ...])
    scanner = ds.Scanner.from_fragment(
        fragment, 
        use_threads=True,
        use_async=False,
        filter=...
    )
    return scanner.to_reader().read_all()

with ThreadPoolExecutor() as pool:
    pa.concat_tables(pool.map(read_file, file_paths)) {code}
Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at most 2 seconds. However, with a ThreadPoolExecutor some of the read_file calls take 20+ seconds.

 

I've tried running this with various combinations of use_threads and use_async to try and see what's happening. The code blocks are sourced from py-spy, and identifying contention was done with viztracer.

 

*use_threads: False, use_async: False*
 * It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: [https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283]
 * pyarrow._dataset.from_fragment seems to be contended. Py-spy suggests this is around getting the physical_schema from the fragment?

 
{code:java}
from_fragment (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
__pyx_getprop_7pyarrow_8_dataset_8Fragment_physical_schema (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
__pthread_cond_timedwait (libpthread-2.17.so) {code}
 

*use_threads: False, use_async: True*
 * There's no longer any contention for pyarrow._dataset.from_fragment
 * But there's lots of contention for pyarrow.lib.RecordBatchReader.read_all

 
{code:java}
arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) 
std::condition_variable::wait (libstdc++.so.6.0.19){code}
*use_threads: True, use_async: False*
 * Appears to be some contention on Scanner.to_reader
 * But most contention remains for RecordBatchReader.read_all

{code:java}
arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::FunctionIterator<arrow::dataset::(anonymous namespace)::SyncScanner::ScanBatches(arrow::Iterator<std::shared_ptr<arrow::dataset::ScanTask> >)::{lambda()#1}, arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
std::condition_variable::wait (libstdc++.so.6.0.19)
__pthread_cond_wait (libpthread-2.17.so) {code}
*use_threads: True, use_async: True*
 * Contention again mostly for RecordBatchReader.read_all, but seems to complete in ~12 seconds rather than 20

{code:java}
arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
arrow::FutureImpl::Wait (pyarrow/libarrow.so.600)
std::condition_variable::wait (libstdc++.so.6.0.19)
__pthread_cond_wait (libpthread-2.17.so) {code}
Is this expected behaviour? Or should it be possible to achieve the same performance from multi-threading as from multi-processing?

 

 

  was:
I'm attempting to read a table from multiple Parquet files where I already know which row_groups I want to read from each file. I also want to apply a filter expression while reading. To do this my code looks roughly like this:

 
{code:java}
def read_file():
    format = ds.ParquetFileFormat(...)
    fragment = format.make_fragment(filepath, row_groups=[0, 1, 2, ...])
    scanner = ds.Scanner.from_fragment(
        fragment, 
        use_threads=True,
        use_async=False,
        filter=...
    )
    return scanner.to_reader().read_all()

with ThreadPoolExecutor() as pool:
    pa.concat_tables(pool.map(read_file, file_paths)) {code}

Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at most 2 seconds. However, with a ThreadPoolExecutor some of the read_file calls take 20+ seconds.

 

I've tried running this with various combinations of use_threads and use_async to try and see what's happening. The code blocks are sourced from py-spy, and identifying contention was done with viztracer.

 

*use_threads: False, use_async: False*
 * It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: [https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283]
 * pyarrow._dataset.from_fragment seems to be contended. Py-spy suggests this is around getting the physical_schema from the fragment?

 
{code:java}
from_fragment (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
__pyx_getprop_7pyarrow_8_dataset_8Fragment_physical_schema (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
__pthread_cond_timedwait (libpthread-2.17.so) {code}
 

*use_threads: False, use_async: True*
 * There's no longer any contention for pyarrow._dataset.from_fragment
 * But there's lots of contention for pyarrow.lib.RecordBatchReader.read_all

 
{code:java}
arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) 
std::condition_variable::wait (libstdc++.so.6.0.19){code}
*use_threads: True, use_async: False*
 * Appears to be some contention on Scanner.to_reader
 * But most contention remains for RecordBatchReader.read_all

{code:java}
arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::FunctionIterator<arrow::dataset::(anonymous namespace)::SyncScanner::ScanBatches(arrow::Iterator<std::shared_ptr<arrow::dataset::ScanTask> >)::{lambda()#1}, arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
std::condition_variable::wait (libstdc++.so.6.0.19)
__pthread_cond_wait (libpthread-2.17.so) {code}
*use_threads: True, use_async: True*
 * Contention again mostly for RecordBatchReader.read_all, but seems to complete in ~12 seconds rather than 20

{code:java}
arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
arrow::FutureImpl::Wait (pyarrow/libarrow.so.600)
std::condition_variable::wait (libstdc++.so.6.0.19)
__pthread_cond_wait (libpthread-2.17.so) {code}

Is this expected behaviour? Or should it be possible to achieve the same performance from multi-threading as from multi-processing?

 

 


> Contention when reading Parquet files with multi-threading
> ----------------------------------------------------------
>
>                 Key: ARROW-14965
>                 URL: https://issues.apache.org/jira/browse/ARROW-14965
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Python
>    Affects Versions: 6.0.0
>            Reporter: Nick Gates
>            Priority: Minor
>
> I'm attempting to read a table from multiple Parquet files where I already know which row_groups I want to read from each file. I also want to apply a filter expression while reading. To do this my code looks roughly like this:
>  
> {code:java}
> def read_file(filepath):
>     format = ds.ParquetFileFormat(...)
>     fragment = format.make_fragment(filepath, row_groups=[0, 1, 2, ...])
>     scanner = ds.Scanner.from_fragment(
>         fragment, 
>         use_threads=True,
>         use_async=False,
>         filter=...
>     )
>     return scanner.to_reader().read_all()
> with ThreadPoolExecutor() as pool:
>     pa.concat_tables(pool.map(read_file, file_paths)) {code}
> Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at most 2 seconds. However, with a ThreadPoolExecutor some of the read_file calls take 20+ seconds.
>  
> I've tried running this with various combinations of use_threads and use_async to try and see what's happening. The code blocks are sourced from py-spy, and identifying contention was done with viztracer.
>  
> *use_threads: False, use_async: False*
>  * It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: [https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283]
>  * pyarrow._dataset.from_fragment seems to be contended. Py-spy suggests this is around getting the physical_schema from the fragment?
>  
> {code:java}
> from_fragment (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
> __pyx_getprop_7pyarrow_8_dataset_8Fragment_physical_schema (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
> __pthread_cond_timedwait (libpthread-2.17.so) {code}
>  
> *use_threads: False, use_async: True*
>  * There's no longer any contention for pyarrow._dataset.from_fragment
>  * But there's lots of contention for pyarrow.lib.RecordBatchReader.read_all
>  
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
> arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) 
> std::condition_variable::wait (libstdc++.so.6.0.19){code}
> *use_threads: True, use_async: False*
>  * Appears to be some contention on Scanner.to_reader
>  * But most contention remains for RecordBatchReader.read_all
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::FunctionIterator<arrow::dataset::(anonymous namespace)::SyncScanner::ScanBatches(arrow::Iterator<std::shared_ptr<arrow::dataset::ScanTask> >)::{lambda()#1}, arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
> std::condition_variable::wait (libstdc++.so.6.0.19)
> __pthread_cond_wait (libpthread-2.17.so) {code}
> *use_threads: True, use_async: True*
>  * Contention again mostly for RecordBatchReader.read_all, but seems to complete in ~12 seconds rather than 20
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > (pyarrow/libarrow_dataset.so.600)
> arrow::FutureImpl::Wait (pyarrow/libarrow.so.600)
> std::condition_variable::wait (libstdc++.so.6.0.19)
> __pthread_cond_wait (libpthread-2.17.so) {code}
> Is this expected behaviour? Or should it be possible to achieve the same performance from multi-threading as from multi-processing?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)