You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Matthew Corley <ma...@gmail.com> on 2020/07/15 22:43:23 UTC

PyArrow: Best approach to concurrency when using read_table()?

Currently, we are developing a Python client that reads parquet data stored
on S3 using PyArrow as an intermediary.  I am mostly looking to describe
how we are using PyArrow, get any feedback with respect to what we might be
missing, and also get some pointers about the best way to use PyArrow in
concurrent Python applications.

*Background: *
Our data tends to be quite wide (many hundreds of thousands up to millions
of columns) with mostly high cardinality columns.  So, we have vertically
sharded our data, where each shard contains 1000 columns of data.

Within each shard, the data is further partitioned into several parquet
files that share the same schema (primarily done to ease the burden on data
write by ensuring that each final parquet file is ~1gb in size).

More concretely, the prefix structure on S3 might look something like this:
data/
  vertical_shard_id=1/
    00001.parquet
    00002.parquet

My understanding of the current implementation is that this call
read_table(source="s3://data/vertical_shard=1/",
columns=["col1", "col2", ..."colN"], use_threads=True,
use_legacy_dataset=True, filesystem=s3fs) evaluates as follows:
   - read each parquet file (00001.parquet, etc) as a PyArrow table in
sequence:
       for each file, columns are read concurrently because
use_threads=True
   - concat the tables together and return a single table

*My first question:*
Ignoring for a moment the question of whether this is a good idea for
performance... if I were to implement my own version of read_table() that
concurrently reads each parquetfile within a vertical shard into a table,
e.g. using multiprocessing, and then concats them in the parent process,
does PyArrow provide any primitives that make it easier to do this without
the typical serialization overhead/limits?  From the docs it seems that
there might be some APIs that would ease sharing memory between
subprocesses but I'm not sure where to start.

*My second question: *
Right now, our client handles requests for data that are distributed across
vertical shards by, in sequence, for each shard:
 - read shard using read_table()
 - convert to pandas dataframe via to_pandas()
 - filter and post-process as needed (to some extent, a work-around for
lack of rowgroup predicate pushdown when we initially started using pyarrow)

If we were to push the reading + processing/filtering of each shard off
into its own subprocess using multiprocessing, what is the best way to
share each dataframe back to the parent process (minimizing
copying/serialization overhead/etc)?  In particular, I wondered if
https://arrow.apache.org/docs/python/ipc.html#serializing-pandas-objects might
in some way prove useful, but I wasn't sure I fully understand the use
cases from the documentation.

Re: PyArrow: Best approach to concurrency when using read_table()?

Posted by Micah Kornfield <em...@gmail.com>.
Hi Matthew,
I'm not an expert in this area but a to answer to try to answer your
questions:

does PyArrow provide any primitives that make it easier to do this without
> the typical serialization overhead/limits?  From the docs it seems that
> there might be some APIs that would ease sharing memory between
> subprocesses but I'm not sure where to start.


As far as I'm aware python doesn't provide shared memory primitives that
would make this absolutely zero costThe easiest things to potentially do
this with minimal overhead (and share similar underlying infrastructure):
1.  Write serialized tables from the child process to well known locations
that the parent process can read from via memory mapping.  This could be
done using python's shared memory [1].
2.  Use Plasma to transfer objects.
3.  It doesn't exist yet to my knowledge, but a MemoryPool [2] that
allocates directly from shared memory would be a useful contribution so one
could simple pass back the pointers to buffers.


I'm not sure about the second question.


[1] https://docs.python.org/3/library/multiprocessing.shared_memory.html
[2] https://arrow.apache.org/docs/python/generated/pyarrow.MemoryPool.html


On Wed, Jul 15, 2020 at 3:43 PM Matthew Corley <ma...@gmail.com> wrote:

> Currently, we are developing a Python client that reads parquet data
> stored on S3 using PyArrow as an intermediary.  I am mostly looking to
> describe how we are using PyArrow, get any feedback with respect to what we
> might be missing, and also get some pointers about the best way to use
> PyArrow in concurrent Python applications.
>
> *Background: *
> Our data tends to be quite wide (many hundreds of thousands up to millions
> of columns) with mostly high cardinality columns.  So, we have vertically
> sharded our data, where each shard contains 1000 columns of data.
>
> Within each shard, the data is further partitioned into several parquet
> files that share the same schema (primarily done to ease the burden on data
> write by ensuring that each final parquet file is ~1gb in size).
>
> More concretely, the prefix structure on S3 might look something like this:
> data/
>   vertical_shard_id=1/
>     00001.parquet
>     00002.parquet
>
> My understanding of the current implementation is that this call read_table(source="s3://data/vertical_shard=1/",
> columns=["col1", "col2", ..."colN"], use_threads=True,
> use_legacy_dataset=True, filesystem=s3fs) evaluates as follows:
>    - read each parquet file (00001.parquet, etc) as a PyArrow table in
> sequence:
>        for each file, columns are read concurrently because
> use_threads=True
>    - concat the tables together and return a single table
>
> *My first question:*
> Ignoring for a moment the question of whether this is a good idea for
> performance... if I were to implement my own version of read_table() that
> concurrently reads each parquetfile within a vertical shard into a table,
> e.g. using multiprocessing, and then concats them in the parent process,
> does PyArrow provide any primitives that make it easier to do this without
> the typical serialization overhead/limits?  From the docs it seems that
> there might be some APIs that would ease sharing memory between
> subprocesses but I'm not sure where to start.
>
> *My second question: *
> Right now, our client handles requests for data that are distributed
> across vertical shards by, in sequence, for each shard:
>  - read shard using read_table()
>  - convert to pandas dataframe via to_pandas()
>  - filter and post-process as needed (to some extent, a work-around for
> lack of rowgroup predicate pushdown when we initially started using pyarrow)
>
> If we were to push the reading + processing/filtering of each shard off
> into its own subprocess using multiprocessing, what is the best way to
> share each dataframe back to the parent process (minimizing
> copying/serialization overhead/etc)?  In particular, I wondered if
> https://arrow.apache.org/docs/python/ipc.html#serializing-pandas-objects might
> in some way prove useful, but I wasn't sure I fully understand the use
> cases from the documentation.
>