You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Cindy McMullen <cm...@twitter.com> on 2022/02/03 13:30:55 UTC

Multi-threaded reads via ParquetFile API

Hi -

I'd like to ingest  batches within a Parquet file in parallel.  The client
(DGLDataset) needs to be thread-safe.  What's the best API for me to use
to do so?

Here's the metadata for one example file:

  <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
  created_by: parquet-mr version 1.12.0 (build
db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
  num_columns: 4
  num_rows: 1000000
  num_row_groups: 9997
  format_version: 1.0
  serialized_size: 17824741

I want the consumption of batches to be distributed among multiple
workers.  I'm currently trying something like this:

# Once per client

pqf = pq.ParquetFile(f, memory_map=True)


# Ideally, each worker can do this, but ParquetFile.iter_batches is
not thread-safe.  This makes intuitive sense. pq_batches =
pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)



My workaround is to buffer these ParquetFile batches into DataFrame [], but
this is memory-intensive, so will not scale to multiple of these input
files.

What's a better PyArrow pattern to use so I can distribute batches to my
workers in a thread-safe manner?

Thanks --

Re: Multi-threaded reads via ParquetFile API

Posted by Cindy McMullen <cm...@twitter.com>.
We've actually POC'd a design for this around PubSub, so it sounds like
we're all thinking along the same lines.
I just wanted to make sure I'd exhausted all possibilities of
multi-threaded Parquet file client reads w/ PyArrow. It sounds like I have,
and PubSub is the best approach.

On Sat, Feb 5, 2022 at 9:42 PM Micah Kornfield <em...@gmail.com>
wrote:

> I was going to suggest something similar to Weston's advice if the
> bottleneck is downstream processing from Parquet.   Looking at the APIs
> linked, it's not entirely clear to me how well that would work.
>
> On Thu, Feb 3, 2022 at 11:58 PM Weston Pace <we...@gmail.com> wrote:
>
>> If you are aiming to speed up processing / consumption of batches then
>> you can use a queue.  For example:
>>
>> # Once per client
>> pqf = pq.ParquetFile(f, memory_map=True)
>> # maxsize will control how much you all buffer in RAM
>> queue = queue.Queue(maxsize=32)
>> start_workers(queue)
>> for batch in pqf.iter_batches(self.rows_per_batch,
>> use_pandas_metadata=True):
>>   queue.put(batch)
>> queue.join()
>> stop_workers()
>>
>> # Each worker would do
>> while not stopped:
>>     next_batch = queue.get()
>>     process_batch(next_batch)
>>     queue.task_done()
>>
>> On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <cm...@twitter.com>
>> wrote:
>> >
>> > We can't use Beam to parallelize multiple file reads, b/c
>> GraphDataLoader is specific to the model being trained.  So multiple Beam
>> processes can't share the same model (until we move into DGL distributed
>> mode later this year).
>> >
>> > We're trying to optimize throughput of the GraphDataLoader
>> consume/process these Parquet files.
>> >
>> > On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <cm...@twitter.com>
>> wrote:
>> >>
>> >> The use case is for a GraphDataLoader to run w/ multiple threads.
>> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to
>> convert into DGL-compatible graph objects.
>> >>
>> >>
>> >> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <em...@gmail.com>
>> wrote:
>> >>>
>> >>> Hi Cindy,
>> >>>>
>> >>>> I'd like to ingest  batches within a Parquet file in parallel.
>> >>>
>> >>> What is the motivation here?  Is it speeding up Parquet reading or
>> processing after the fact?
>> >>>
>> >>>
>> >>> Side note, the size of your row groups seems quite small (it might be
>> right for your specific use-case).
>> >>>
>> >>> Cheers,
>> >>> Micah
>> >>>
>> >>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cm...@twitter.com>
>> wrote:
>> >>>>
>> >>>> Maybe -- will give it a try.  Thanks for the suggestion.
>> >>>>
>> >>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> There is a parameter to iter_batches where you can pass in the
>> row_group number, or a list of row groups. Would this help to read the
>> Parquet file in parallel?
>> >>>>>
>> >>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <
>> cmcmullen@twitter.com> wrote:
>> >>>>>>
>> >>>>>> Hi -
>> >>>>>>
>> >>>>>> I'd like to ingest  batches within a Parquet file in parallel.
>> The client (DGLDataset) needs to be thread-safe.  What's the best API for
>> me to use  to do so?
>> >>>>>>
>> >>>>>> Here's the metadata for one example file:
>> >>>>>>
>> >>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>> >>>>>>   created_by: parquet-mr version 1.12.0 (build
>> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>> >>>>>>   num_columns: 4
>> >>>>>>   num_rows: 1000000
>> >>>>>>   num_row_groups: 9997
>> >>>>>>   format_version: 1.0
>> >>>>>>   serialized_size: 17824741
>> >>>>>>
>> >>>>>> I want the consumption of batches to be distributed among multiple
>> workers.  I'm currently trying something like this:
>> >>>>>>
>> >>>>>> # Once per client
>> >>>>>>
>> >>>>>> pqf = pq.ParquetFile(f, memory_map=True)
>> >>>>>>
>> >>>>>>
>> >>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches
>> is not thread-safe.  This makes intuitive sense.
>> >>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch,
>> use_pandas_metadata=True)
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> My workaround is to buffer these ParquetFile batches into
>> DataFrame [], but this is memory-intensive, so will not scale to multiple
>> of these input files.
>> >>>>>>
>> >>>>>> What's a better PyArrow pattern to use so I can distribute batches
>> to my workers in a thread-safe manner?
>> >>>>>>
>> >>>>>> Thanks --
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>> --
>> >>>>> Partha Dutta
>> >>>>> partha.dutta@gmail.com
>>
>

Re: Multi-threaded reads via ParquetFile API

Posted by Micah Kornfield <em...@gmail.com>.
I was going to suggest something similar to Weston's advice if the
bottleneck is downstream processing from Parquet.   Looking at the APIs
linked, it's not entirely clear to me how well that would work.

On Thu, Feb 3, 2022 at 11:58 PM Weston Pace <we...@gmail.com> wrote:

> If you are aiming to speed up processing / consumption of batches then
> you can use a queue.  For example:
>
> # Once per client
> pqf = pq.ParquetFile(f, memory_map=True)
> # maxsize will control how much you all buffer in RAM
> queue = queue.Queue(maxsize=32)
> start_workers(queue)
> for batch in pqf.iter_batches(self.rows_per_batch,
> use_pandas_metadata=True):
>   queue.put(batch)
> queue.join()
> stop_workers()
>
> # Each worker would do
> while not stopped:
>     next_batch = queue.get()
>     process_batch(next_batch)
>     queue.task_done()
>
> On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <cm...@twitter.com>
> wrote:
> >
> > We can't use Beam to parallelize multiple file reads, b/c
> GraphDataLoader is specific to the model being trained.  So multiple Beam
> processes can't share the same model (until we move into DGL distributed
> mode later this year).
> >
> > We're trying to optimize throughput of the GraphDataLoader
> consume/process these Parquet files.
> >
> > On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <cm...@twitter.com>
> wrote:
> >>
> >> The use case is for a GraphDataLoader to run w/ multiple threads.
> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to
> convert into DGL-compatible graph objects.
> >>
> >>
> >> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <em...@gmail.com>
> wrote:
> >>>
> >>> Hi Cindy,
> >>>>
> >>>> I'd like to ingest  batches within a Parquet file in parallel.
> >>>
> >>> What is the motivation here?  Is it speeding up Parquet reading or
> processing after the fact?
> >>>
> >>>
> >>> Side note, the size of your row groups seems quite small (it might be
> right for your specific use-case).
> >>>
> >>> Cheers,
> >>> Micah
> >>>
> >>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cm...@twitter.com>
> wrote:
> >>>>
> >>>> Maybe -- will give it a try.  Thanks for the suggestion.
> >>>>
> >>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com>
> wrote:
> >>>>>
> >>>>> There is a parameter to iter_batches where you can pass in the
> row_group number, or a list of row groups. Would this help to read the
> Parquet file in parallel?
> >>>>>
> >>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com>
> wrote:
> >>>>>>
> >>>>>> Hi -
> >>>>>>
> >>>>>> I'd like to ingest  batches within a Parquet file in parallel.  The
> client (DGLDataset) needs to be thread-safe.  What's the best API for me to
> use  to do so?
> >>>>>>
> >>>>>> Here's the metadata for one example file:
> >>>>>>
> >>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
> >>>>>>   created_by: parquet-mr version 1.12.0 (build
> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
> >>>>>>   num_columns: 4
> >>>>>>   num_rows: 1000000
> >>>>>>   num_row_groups: 9997
> >>>>>>   format_version: 1.0
> >>>>>>   serialized_size: 17824741
> >>>>>>
> >>>>>> I want the consumption of batches to be distributed among multiple
> workers.  I'm currently trying something like this:
> >>>>>>
> >>>>>> # Once per client
> >>>>>>
> >>>>>> pqf = pq.ParquetFile(f, memory_map=True)
> >>>>>>
> >>>>>>
> >>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is
> not thread-safe.  This makes intuitive sense.
> >>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch,
> use_pandas_metadata=True)
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> My workaround is to buffer these ParquetFile batches into DataFrame
> [], but this is memory-intensive, so will not scale to multiple of these
> input files.
> >>>>>>
> >>>>>> What's a better PyArrow pattern to use so I can distribute batches
> to my workers in a thread-safe manner?
> >>>>>>
> >>>>>> Thanks --
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Partha Dutta
> >>>>> partha.dutta@gmail.com
>

Re: Multi-threaded reads via ParquetFile API

Posted by Weston Pace <we...@gmail.com>.
If you are aiming to speed up processing / consumption of batches then
you can use a queue.  For example:

# Once per client
pqf = pq.ParquetFile(f, memory_map=True)
# maxsize will control how much you all buffer in RAM
queue = queue.Queue(maxsize=32)
start_workers(queue)
for batch in pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True):
  queue.put(batch)
queue.join()
stop_workers()

# Each worker would do
while not stopped:
    next_batch = queue.get()
    process_batch(next_batch)
    queue.task_done()

On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <cm...@twitter.com> wrote:
>
> We can't use Beam to parallelize multiple file reads, b/c GraphDataLoader is specific to the model being trained.  So multiple Beam processes can't share the same model (until we move into DGL distributed mode later this year).
>
> We're trying to optimize throughput of the GraphDataLoader consume/process these Parquet files.
>
> On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <cm...@twitter.com> wrote:
>>
>> The use case is for a GraphDataLoader to run w/ multiple threads.  GraphDataLoade invokes its DGLDataset, which loads these Parquet files to convert into DGL-compatible graph objects.
>>
>>
>> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <em...@gmail.com> wrote:
>>>
>>> Hi Cindy,
>>>>
>>>> I'd like to ingest  batches within a Parquet file in parallel.
>>>
>>> What is the motivation here?  Is it speeding up Parquet reading or processing after the fact?
>>>
>>>
>>> Side note, the size of your row groups seems quite small (it might be right for your specific use-case).
>>>
>>> Cheers,
>>> Micah
>>>
>>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cm...@twitter.com> wrote:
>>>>
>>>> Maybe -- will give it a try.  Thanks for the suggestion.
>>>>
>>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com> wrote:
>>>>>
>>>>> There is a parameter to iter_batches where you can pass in the row_group number, or a list of row groups. Would this help to read the Parquet file in parallel?
>>>>>
>>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com> wrote:
>>>>>>
>>>>>> Hi -
>>>>>>
>>>>>> I'd like to ingest  batches within a Parquet file in parallel.  The client (DGLDataset) needs to be thread-safe.  What's the best API for me to use  to do so?
>>>>>>
>>>>>> Here's the metadata for one example file:
>>>>>>
>>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>>>>>   created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>>>>>   num_columns: 4
>>>>>>   num_rows: 1000000
>>>>>>   num_row_groups: 9997
>>>>>>   format_version: 1.0
>>>>>>   serialized_size: 17824741
>>>>>>
>>>>>> I want the consumption of batches to be distributed among multiple workers.  I'm currently trying something like this:
>>>>>>
>>>>>> # Once per client
>>>>>>
>>>>>> pqf = pq.ParquetFile(f, memory_map=True)
>>>>>>
>>>>>>
>>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not thread-safe.  This makes intuitive sense.
>>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)
>>>>>>
>>>>>>
>>>>>>
>>>>>> My workaround is to buffer these ParquetFile batches into DataFrame [], but this is memory-intensive, so will not scale to multiple of these input files.
>>>>>>
>>>>>> What's a better PyArrow pattern to use so I can distribute batches to my workers in a thread-safe manner?
>>>>>>
>>>>>> Thanks --
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Partha Dutta
>>>>> partha.dutta@gmail.com

Re: Multi-threaded reads via ParquetFile API

Posted by Cindy McMullen <cm...@twitter.com>.
We can't use Beam to parallelize multiple file reads, b/c GraphDataLoader
is specific to the model being trained.  So multiple Beam processes can't
share the same model (until we move into DGL distributed mode later this
year).

We're trying to optimize throughput of the GraphDataLoader consume/process
these Parquet files.

On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <cm...@twitter.com>
wrote:

> The use case is for a GraphDataLoader
> <https://docs.dgl.ai/en/0.6.x/api/python/dgl.dataloading.html#dgl.dataloading.pytorch.GraphDataLoader> to
> run w/ multiple threads.  GraphDataLoade invokes its DGLDataset
> <https://docs.dgl.ai/en/0.6.x/api/python/dgl.data.html#dgl.data.DGLDataset>,
> which loads these Parquet files to convert into DGL-compatible
> graph objects.
>
>
> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <em...@gmail.com>
> wrote:
>
>> Hi Cindy,
>>
>>> I'd like to ingest  batches within a Parquet file in parallel.
>>
>> What is the motivation here?  Is it speeding up Parquet reading or
>> processing after the fact?
>>
>>
>> Side note, the size of your row groups seems quite small (it might be
>> right for your specific use-case).
>>
>> Cheers,
>> Micah
>>
>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cm...@twitter.com>
>> wrote:
>>
>>> Maybe -- will give it a try.  Thanks for the suggestion.
>>>
>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com>
>>> wrote:
>>>
>>>> There is a parameter to iter_batches where you can pass in the
>>>> row_group number, or a list of row groups. Would this help to read the
>>>> Parquet file in parallel?
>>>>
>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com>
>>>> wrote:
>>>>
>>>>> Hi -
>>>>>
>>>>> I'd like to ingest  batches within a Parquet file in parallel.  The
>>>>> client (DGLDataset) needs to be thread-safe.  What's the best API for me to
>>>>> use  to do so?
>>>>>
>>>>> Here's the metadata for one example file:
>>>>>
>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>>>>   created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>>>>   num_columns: 4
>>>>>   num_rows: 1000000
>>>>>   num_row_groups: 9997
>>>>>   format_version: 1.0
>>>>>   serialized_size: 17824741
>>>>>
>>>>> I want the consumption of batches to be distributed among multiple
>>>>> workers.  I'm currently trying something like this:
>>>>>
>>>>> # Once per client
>>>>>
>>>>> pqf = pq.ParquetFile(f, memory_map=True)
>>>>>
>>>>>
>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not thread-safe.  This makes intuitive sense. pq_batches = pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)
>>>>>
>>>>>
>>>>>
>>>>> My workaround is to buffer these ParquetFile batches into DataFrame
>>>>> [], but this is memory-intensive, so will not scale to multiple of these
>>>>> input files.
>>>>>
>>>>> What's a better PyArrow pattern to use so I can distribute batches to
>>>>> my workers in a thread-safe manner?
>>>>>
>>>>> Thanks --
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Partha Dutta
>>>> partha.dutta@gmail.com
>>>>
>>>

Re: Multi-threaded reads via ParquetFile API

Posted by Cindy McMullen <cm...@twitter.com>.
The use case is for a GraphDataLoader
<https://docs.dgl.ai/en/0.6.x/api/python/dgl.dataloading.html#dgl.dataloading.pytorch.GraphDataLoader>
to
run w/ multiple threads.  GraphDataLoade invokes its DGLDataset
<https://docs.dgl.ai/en/0.6.x/api/python/dgl.data.html#dgl.data.DGLDataset>,
which loads these Parquet files to convert into DGL-compatible
graph objects.


On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <em...@gmail.com>
wrote:

> Hi Cindy,
>
>> I'd like to ingest  batches within a Parquet file in parallel.
>
> What is the motivation here?  Is it speeding up Parquet reading or
> processing after the fact?
>
>
> Side note, the size of your row groups seems quite small (it might be
> right for your specific use-case).
>
> Cheers,
> Micah
>
> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cm...@twitter.com>
> wrote:
>
>> Maybe -- will give it a try.  Thanks for the suggestion.
>>
>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com>
>> wrote:
>>
>>> There is a parameter to iter_batches where you can pass in the row_group
>>> number, or a list of row groups. Would this help to read the Parquet file
>>> in parallel?
>>>
>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com>
>>> wrote:
>>>
>>>> Hi -
>>>>
>>>> I'd like to ingest  batches within a Parquet file in parallel.  The
>>>> client (DGLDataset) needs to be thread-safe.  What's the best API for me to
>>>> use  to do so?
>>>>
>>>> Here's the metadata for one example file:
>>>>
>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>>>   created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>>>   num_columns: 4
>>>>   num_rows: 1000000
>>>>   num_row_groups: 9997
>>>>   format_version: 1.0
>>>>   serialized_size: 17824741
>>>>
>>>> I want the consumption of batches to be distributed among multiple
>>>> workers.  I'm currently trying something like this:
>>>>
>>>> # Once per client
>>>>
>>>> pqf = pq.ParquetFile(f, memory_map=True)
>>>>
>>>>
>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not thread-safe.  This makes intuitive sense. pq_batches = pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)
>>>>
>>>>
>>>>
>>>> My workaround is to buffer these ParquetFile batches into DataFrame [],
>>>> but this is memory-intensive, so will not scale to multiple of these input
>>>> files.
>>>>
>>>> What's a better PyArrow pattern to use so I can distribute batches to
>>>> my workers in a thread-safe manner?
>>>>
>>>> Thanks --
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> Partha Dutta
>>> partha.dutta@gmail.com
>>>
>>

Re: Multi-threaded reads via ParquetFile API

Posted by Micah Kornfield <em...@gmail.com>.
Hi Cindy,

> I'd like to ingest  batches within a Parquet file in parallel.

What is the motivation here?  Is it speeding up Parquet reading or
processing after the fact?


Side note, the size of your row groups seems quite small (it might be right
for your specific use-case).

Cheers,
Micah

On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cm...@twitter.com> wrote:

> Maybe -- will give it a try.  Thanks for the suggestion.
>
> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com>
> wrote:
>
>> There is a parameter to iter_batches where you can pass in the row_group
>> number, or a list of row groups. Would this help to read the Parquet file
>> in parallel?
>>
>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com>
>> wrote:
>>
>>> Hi -
>>>
>>> I'd like to ingest  batches within a Parquet file in parallel.  The
>>> client (DGLDataset) needs to be thread-safe.  What's the best API for me to
>>> use  to do so?
>>>
>>> Here's the metadata for one example file:
>>>
>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>>   created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>>   num_columns: 4
>>>   num_rows: 1000000
>>>   num_row_groups: 9997
>>>   format_version: 1.0
>>>   serialized_size: 17824741
>>>
>>> I want the consumption of batches to be distributed among multiple
>>> workers.  I'm currently trying something like this:
>>>
>>> # Once per client
>>>
>>> pqf = pq.ParquetFile(f, memory_map=True)
>>>
>>>
>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not thread-safe.  This makes intuitive sense. pq_batches = pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)
>>>
>>>
>>>
>>> My workaround is to buffer these ParquetFile batches into DataFrame [],
>>> but this is memory-intensive, so will not scale to multiple of these input
>>> files.
>>>
>>> What's a better PyArrow pattern to use so I can distribute batches to my
>>> workers in a thread-safe manner?
>>>
>>> Thanks --
>>>
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Partha Dutta
>> partha.dutta@gmail.com
>>
>

Re: Multi-threaded reads via ParquetFile API

Posted by Cindy McMullen <cm...@twitter.com>.
Maybe -- will give it a try.  Thanks for the suggestion.

On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <pa...@gmail.com> wrote:

> There is a parameter to iter_batches where you can pass in the row_group
> number, or a list of row groups. Would this help to read the Parquet file
> in parallel?
>
> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com>
> wrote:
>
>> Hi -
>>
>> I'd like to ingest  batches within a Parquet file in parallel.  The
>> client (DGLDataset) needs to be thread-safe.  What's the best API for me to
>> use  to do so?
>>
>> Here's the metadata for one example file:
>>
>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>   created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>   num_columns: 4
>>   num_rows: 1000000
>>   num_row_groups: 9997
>>   format_version: 1.0
>>   serialized_size: 17824741
>>
>> I want the consumption of batches to be distributed among multiple
>> workers.  I'm currently trying something like this:
>>
>> # Once per client
>>
>> pqf = pq.ParquetFile(f, memory_map=True)
>>
>>
>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not thread-safe.  This makes intuitive sense. pq_batches = pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)
>>
>>
>>
>> My workaround is to buffer these ParquetFile batches into DataFrame [],
>> but this is memory-intensive, so will not scale to multiple of these input
>> files.
>>
>> What's a better PyArrow pattern to use so I can distribute batches to my
>> workers in a thread-safe manner?
>>
>> Thanks --
>>
>>
>>
>>
>>
>>
>
> --
> Partha Dutta
> partha.dutta@gmail.com
>

Re: Multi-threaded reads via ParquetFile API

Posted by Partha Dutta <pa...@gmail.com>.
There is a parameter to iter_batches where you can pass in the row_group
number, or a list of row groups. Would this help to read the Parquet file
in parallel?

On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cm...@twitter.com> wrote:

> Hi -
>
> I'd like to ingest  batches within a Parquet file in parallel.  The client
> (DGLDataset) needs to be thread-safe.  What's the best API for me to use
> to do so?
>
> Here's the metadata for one example file:
>
>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>   created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>   num_columns: 4
>   num_rows: 1000000
>   num_row_groups: 9997
>   format_version: 1.0
>   serialized_size: 17824741
>
> I want the consumption of batches to be distributed among multiple
> workers.  I'm currently trying something like this:
>
> # Once per client
>
> pqf = pq.ParquetFile(f, memory_map=True)
>
>
> # Ideally, each worker can do this, but ParquetFile.iter_batches is not thread-safe.  This makes intuitive sense. pq_batches = pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True)
>
>
>
> My workaround is to buffer these ParquetFile batches into DataFrame [],
> but this is memory-intensive, so will not scale to multiple of these input
> files.
>
> What's a better PyArrow pattern to use so I can distribute batches to my
> workers in a thread-safe manner?
>
> Thanks --
>
>
>
>
>
>

-- 
Partha Dutta
partha.dutta@gmail.com