You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Corey Nolet <cj...@gmail.com> on 2018/05/16 17:02:24 UTC

PyArrow & Python Multiprocessing

I've been reading through the PyArrow documentation and trying to
understand how to use the tool effectively for IPC (using zero-copy).

I'm on a system with 586 cores & 1TB of ram. I'm using Panda's Dataframes
to process several 10's of gigs of data in memory and the pickling that is
done by Python's multiprocessing API is very wasteful.

I'm running a little hand-built map-reduce where I chunk the dataframe into
N_mappers number of chunks, run some processing on them, then run some
number N_reducers to finalize the operation. What I'd like to be able to do
is chunk up the dataframe into Arrow Buffer objects and just have each
mapped task read their respective Buffer object with the guarantee of
zero-copy.

I see there's a couple Filesystem abstractions for doing memory-mapped
files. Durability isn't something I need and I'm willing to forego the
expense of putting the files on disk.

Is it possible to write the data directly to memory and pass just the
reference around to the different processes? What's the recommended way to
accomplish my goal here?


Thanks in advance!

Re: PyArrow & Python Multiprocessing

Posted by Robert Nishihara <ro...@gmail.com>.
You're welcome!

On Wed, May 16, 2018 at 6:13 PM Corey Nolet <cj...@gmail.com> wrote:

> I must say, I’m super excited about using Arrow and Plasma.
>
> The code you just posted worked for me at home and I’m sure I’ll figure
> out what I was doing wrong tomorrow at work.
>
> Anyways, thanks so much for your help and fast replies!
>
> Sent from my iPhone
>
> > On May 16, 2018, at 7:42 PM, Robert Nishihara <ro...@gmail.com>
> wrote:
> >
> > You should be able to do something like the following.
> >
> > # Start the store.
> > plasma_store -s /tmp/store -m 1000000000
> >
> > Then in Python, do the following:
> >
> > import pandas as pd
> > import pyarrow.plasma as plasma
> > import numpy as np
> >
> > client = plasma.connect('/tmp/store', '', 0)
> > series = pd.Series(np.zeros(100))
> > object_id = client.put(series)
> >
> > And yes, I would create a separate Plasma client for each process. I
> don't
> > think you'll be able to pickle a Plasma client object successfully (it
> has
> > a socket connection to the store).
> >
> > On Wed, May 16, 2018 at 3:43 PM Corey Nolet <cj...@gmail.com> wrote:
> >
> >> Robert,
> >>
> >> Thank you for the quick response. I've been playing around for a few
> hours
> >> to get a feel for how this works.
> >>
> >> If I understand correctly, it's better to have the Plasma client objects
> >> instantiated within each separate process? Weird things seemed to happen
> >> when I attempted to share a single one. I was assuming that the pickle
> >> serialization by python multiprocessing would have been serializing the
> >> connection info and re-instantiating on the other side but that didn't
> seem
> >> to be the case.
> >>
> >> I managed to load up a gigantic set of CSV files into Dataframes. Now
> I'm
> >> attempting to read the chunks, perform a groupby-aggregate, and write
> the
> >> results back to the Plasma store. Unless I'm mistaken, there doesn't
> seem
> >> to be a very direct way of accomplishing this. When I tried converting
> the
> >> Series object into a Plasma Array and just doing a client.put(array) I
> get
> >> a pickling error. Unless maybe I'm misunderstanding the architecture
> here,
> >> I believe that error would have been referring to attempts to serialize
> the
> >> object into a file? I would hope that the data isn't all being sent to
> the
> >> single Plasma server (or sent over sockets for that matter).
> >>
> >> What would be the recommended strategy for serializing Pandas Series
> >> objects? I really like the StreamWriter concept here but there does not
> >> seem to be a direct way (or documentation) to accomplish this.
> >>
> >> Thanks again.
> >>
> >> On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <
> >> robertnishihara@gmail.com
> >>> wrote:
> >>
> >>> Take a look at the Plasma object store
> >>> https://arrow.apache.org/docs/python/plasma.html.
> >>>
> >>> Here's an example using it (along with multiprocessing to sort a pandas
> >>> dataframe)
> >>> https://github.com/apache/arrow/blob/master/python/
> >>> examples/plasma/sorting/sort_df.py.
> >>> It's possible the example is a bit out of date.
> >>>
> >>> You may be interested in taking a look at Ray
> >>> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood
> >> to
> >>> do all of these things but hide a lot of the bookkeeping (like object
> ID
> >>> generation). For your setting, you can think of it as a replacement for
> >>> Python multiprocessing that automatically uses shared memory and Arrow
> >> for
> >>> serialization.
> >>>
> >>>> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cj...@gmail.com>
> wrote:
> >>>>
> >>>> I've been reading through the PyArrow documentation and trying to
> >>>> understand how to use the tool effectively for IPC (using zero-copy).
> >>>>
> >>>> I'm on a system with 586 cores & 1TB of ram. I'm using Panda's
> >> Dataframes
> >>>> to process several 10's of gigs of data in memory and the pickling
> that
> >>> is
> >>>> done by Python's multiprocessing API is very wasteful.
> >>>>
> >>>> I'm running a little hand-built map-reduce where I chunk the dataframe
> >>> into
> >>>> N_mappers number of chunks, run some processing on them, then run some
> >>>> number N_reducers to finalize the operation. What I'd like to be able
> >> to
> >>> do
> >>>> is chunk up the dataframe into Arrow Buffer objects and just have each
> >>>> mapped task read their respective Buffer object with the guarantee of
> >>>> zero-copy.
> >>>>
> >>>> I see there's a couple Filesystem abstractions for doing memory-mapped
> >>>> files. Durability isn't something I need and I'm willing to forego the
> >>>> expense of putting the files on disk.
> >>>>
> >>>> Is it possible to write the data directly to memory and pass just the
> >>>> reference around to the different processes? What's the recommended
> way
> >>> to
> >>>> accomplish my goal here?
> >>>>
> >>>>
> >>>> Thanks in advance!
> >>>>
> >>>
> >>
>

Re: PyArrow & Python Multiprocessing

Posted by Corey Nolet <cj...@gmail.com>.
I must say, I’m super excited about using Arrow and Plasma.

The code you just posted worked for me at home and I’m sure I’ll figure out what I was doing wrong tomorrow at work. 

Anyways, thanks so much for your help and fast replies! 

Sent from my iPhone

> On May 16, 2018, at 7:42 PM, Robert Nishihara <ro...@gmail.com> wrote:
> 
> You should be able to do something like the following.
> 
> # Start the store.
> plasma_store -s /tmp/store -m 1000000000
> 
> Then in Python, do the following:
> 
> import pandas as pd
> import pyarrow.plasma as plasma
> import numpy as np
> 
> client = plasma.connect('/tmp/store', '', 0)
> series = pd.Series(np.zeros(100))
> object_id = client.put(series)
> 
> And yes, I would create a separate Plasma client for each process. I don't
> think you'll be able to pickle a Plasma client object successfully (it has
> a socket connection to the store).
> 
> On Wed, May 16, 2018 at 3:43 PM Corey Nolet <cj...@gmail.com> wrote:
> 
>> Robert,
>> 
>> Thank you for the quick response. I've been playing around for a few hours
>> to get a feel for how this works.
>> 
>> If I understand correctly, it's better to have the Plasma client objects
>> instantiated within each separate process? Weird things seemed to happen
>> when I attempted to share a single one. I was assuming that the pickle
>> serialization by python multiprocessing would have been serializing the
>> connection info and re-instantiating on the other side but that didn't seem
>> to be the case.
>> 
>> I managed to load up a gigantic set of CSV files into Dataframes. Now I'm
>> attempting to read the chunks, perform a groupby-aggregate, and write the
>> results back to the Plasma store. Unless I'm mistaken, there doesn't seem
>> to be a very direct way of accomplishing this. When I tried converting the
>> Series object into a Plasma Array and just doing a client.put(array) I get
>> a pickling error. Unless maybe I'm misunderstanding the architecture here,
>> I believe that error would have been referring to attempts to serialize the
>> object into a file? I would hope that the data isn't all being sent to the
>> single Plasma server (or sent over sockets for that matter).
>> 
>> What would be the recommended strategy for serializing Pandas Series
>> objects? I really like the StreamWriter concept here but there does not
>> seem to be a direct way (or documentation) to accomplish this.
>> 
>> Thanks again.
>> 
>> On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <
>> robertnishihara@gmail.com
>>> wrote:
>> 
>>> Take a look at the Plasma object store
>>> https://arrow.apache.org/docs/python/plasma.html.
>>> 
>>> Here's an example using it (along with multiprocessing to sort a pandas
>>> dataframe)
>>> https://github.com/apache/arrow/blob/master/python/
>>> examples/plasma/sorting/sort_df.py.
>>> It's possible the example is a bit out of date.
>>> 
>>> You may be interested in taking a look at Ray
>>> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood
>> to
>>> do all of these things but hide a lot of the bookkeeping (like object ID
>>> generation). For your setting, you can think of it as a replacement for
>>> Python multiprocessing that automatically uses shared memory and Arrow
>> for
>>> serialization.
>>> 
>>>> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cj...@gmail.com> wrote:
>>>> 
>>>> I've been reading through the PyArrow documentation and trying to
>>>> understand how to use the tool effectively for IPC (using zero-copy).
>>>> 
>>>> I'm on a system with 586 cores & 1TB of ram. I'm using Panda's
>> Dataframes
>>>> to process several 10's of gigs of data in memory and the pickling that
>>> is
>>>> done by Python's multiprocessing API is very wasteful.
>>>> 
>>>> I'm running a little hand-built map-reduce where I chunk the dataframe
>>> into
>>>> N_mappers number of chunks, run some processing on them, then run some
>>>> number N_reducers to finalize the operation. What I'd like to be able
>> to
>>> do
>>>> is chunk up the dataframe into Arrow Buffer objects and just have each
>>>> mapped task read their respective Buffer object with the guarantee of
>>>> zero-copy.
>>>> 
>>>> I see there's a couple Filesystem abstractions for doing memory-mapped
>>>> files. Durability isn't something I need and I'm willing to forego the
>>>> expense of putting the files on disk.
>>>> 
>>>> Is it possible to write the data directly to memory and pass just the
>>>> reference around to the different processes? What's the recommended way
>>> to
>>>> accomplish my goal here?
>>>> 
>>>> 
>>>> Thanks in advance!
>>>> 
>>> 
>> 

Re: PyArrow & Python Multiprocessing

Posted by Robert Nishihara <ro...@gmail.com>.
You should be able to do something like the following.

# Start the store.
plasma_store -s /tmp/store -m 1000000000

Then in Python, do the following:

import pandas as pd
import pyarrow.plasma as plasma
import numpy as np

client = plasma.connect('/tmp/store', '', 0)
series = pd.Series(np.zeros(100))
object_id = client.put(series)

And yes, I would create a separate Plasma client for each process. I don't
think you'll be able to pickle a Plasma client object successfully (it has
a socket connection to the store).

On Wed, May 16, 2018 at 3:43 PM Corey Nolet <cj...@gmail.com> wrote:

> Robert,
>
> Thank you for the quick response. I've been playing around for a few hours
> to get a feel for how this works.
>
> If I understand correctly, it's better to have the Plasma client objects
> instantiated within each separate process? Weird things seemed to happen
> when I attempted to share a single one. I was assuming that the pickle
> serialization by python multiprocessing would have been serializing the
> connection info and re-instantiating on the other side but that didn't seem
> to be the case.
>
> I managed to load up a gigantic set of CSV files into Dataframes. Now I'm
> attempting to read the chunks, perform a groupby-aggregate, and write the
> results back to the Plasma store. Unless I'm mistaken, there doesn't seem
> to be a very direct way of accomplishing this. When I tried converting the
> Series object into a Plasma Array and just doing a client.put(array) I get
> a pickling error. Unless maybe I'm misunderstanding the architecture here,
> I believe that error would have been referring to attempts to serialize the
> object into a file? I would hope that the data isn't all being sent to the
> single Plasma server (or sent over sockets for that matter).
>
> What would be the recommended strategy for serializing Pandas Series
> objects? I really like the StreamWriter concept here but there does not
> seem to be a direct way (or documentation) to accomplish this.
>
> Thanks again.
>
> On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <
> robertnishihara@gmail.com
> > wrote:
>
> > Take a look at the Plasma object store
> > https://arrow.apache.org/docs/python/plasma.html.
> >
> > Here's an example using it (along with multiprocessing to sort a pandas
> > dataframe)
> > https://github.com/apache/arrow/blob/master/python/
> > examples/plasma/sorting/sort_df.py.
> > It's possible the example is a bit out of date.
> >
> > You may be interested in taking a look at Ray
> > https://github.com/ray-project/ray. We use Plasma/Arrow under the hood
> to
> > do all of these things but hide a lot of the bookkeeping (like object ID
> > generation). For your setting, you can think of it as a replacement for
> > Python multiprocessing that automatically uses shared memory and Arrow
> for
> > serialization.
> >
> > On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cj...@gmail.com> wrote:
> >
> > > I've been reading through the PyArrow documentation and trying to
> > > understand how to use the tool effectively for IPC (using zero-copy).
> > >
> > > I'm on a system with 586 cores & 1TB of ram. I'm using Panda's
> Dataframes
> > > to process several 10's of gigs of data in memory and the pickling that
> > is
> > > done by Python's multiprocessing API is very wasteful.
> > >
> > > I'm running a little hand-built map-reduce where I chunk the dataframe
> > into
> > > N_mappers number of chunks, run some processing on them, then run some
> > > number N_reducers to finalize the operation. What I'd like to be able
> to
> > do
> > > is chunk up the dataframe into Arrow Buffer objects and just have each
> > > mapped task read their respective Buffer object with the guarantee of
> > > zero-copy.
> > >
> > > I see there's a couple Filesystem abstractions for doing memory-mapped
> > > files. Durability isn't something I need and I'm willing to forego the
> > > expense of putting the files on disk.
> > >
> > > Is it possible to write the data directly to memory and pass just the
> > > reference around to the different processes? What's the recommended way
> > to
> > > accomplish my goal here?
> > >
> > >
> > > Thanks in advance!
> > >
> >
>

Re: PyArrow & Python Multiprocessing

Posted by Corey Nolet <cj...@gmail.com>.
Robert,

Thank you for the quick response. I've been playing around for a few hours
to get a feel for how this works.

If I understand correctly, it's better to have the Plasma client objects
instantiated within each separate process? Weird things seemed to happen
when I attempted to share a single one. I was assuming that the pickle
serialization by python multiprocessing would have been serializing the
connection info and re-instantiating on the other side but that didn't seem
to be the case.

I managed to load up a gigantic set of CSV files into Dataframes. Now I'm
attempting to read the chunks, perform a groupby-aggregate, and write the
results back to the Plasma store. Unless I'm mistaken, there doesn't seem
to be a very direct way of accomplishing this. When I tried converting the
Series object into a Plasma Array and just doing a client.put(array) I get
a pickling error. Unless maybe I'm misunderstanding the architecture here,
I believe that error would have been referring to attempts to serialize the
object into a file? I would hope that the data isn't all being sent to the
single Plasma server (or sent over sockets for that matter).

What would be the recommended strategy for serializing Pandas Series
objects? I really like the StreamWriter concept here but there does not
seem to be a direct way (or documentation) to accomplish this.

Thanks again.

On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <robertnishihara@gmail.com
> wrote:

> Take a look at the Plasma object store
> https://arrow.apache.org/docs/python/plasma.html.
>
> Here's an example using it (along with multiprocessing to sort a pandas
> dataframe)
> https://github.com/apache/arrow/blob/master/python/
> examples/plasma/sorting/sort_df.py.
> It's possible the example is a bit out of date.
>
> You may be interested in taking a look at Ray
> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood to
> do all of these things but hide a lot of the bookkeeping (like object ID
> generation). For your setting, you can think of it as a replacement for
> Python multiprocessing that automatically uses shared memory and Arrow for
> serialization.
>
> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cj...@gmail.com> wrote:
>
> > I've been reading through the PyArrow documentation and trying to
> > understand how to use the tool effectively for IPC (using zero-copy).
> >
> > I'm on a system with 586 cores & 1TB of ram. I'm using Panda's Dataframes
> > to process several 10's of gigs of data in memory and the pickling that
> is
> > done by Python's multiprocessing API is very wasteful.
> >
> > I'm running a little hand-built map-reduce where I chunk the dataframe
> into
> > N_mappers number of chunks, run some processing on them, then run some
> > number N_reducers to finalize the operation. What I'd like to be able to
> do
> > is chunk up the dataframe into Arrow Buffer objects and just have each
> > mapped task read their respective Buffer object with the guarantee of
> > zero-copy.
> >
> > I see there's a couple Filesystem abstractions for doing memory-mapped
> > files. Durability isn't something I need and I'm willing to forego the
> > expense of putting the files on disk.
> >
> > Is it possible to write the data directly to memory and pass just the
> > reference around to the different processes? What's the recommended way
> to
> > accomplish my goal here?
> >
> >
> > Thanks in advance!
> >
>

Re: PyArrow & Python Multiprocessing

Posted by Robert Nishihara <ro...@gmail.com>.
Take a look at the Plasma object store
https://arrow.apache.org/docs/python/plasma.html.

Here's an example using it (along with multiprocessing to sort a pandas
dataframe)
https://github.com/apache/arrow/blob/master/python/examples/plasma/sorting/sort_df.py.
It's possible the example is a bit out of date.

You may be interested in taking a look at Ray
https://github.com/ray-project/ray. We use Plasma/Arrow under the hood to
do all of these things but hide a lot of the bookkeeping (like object ID
generation). For your setting, you can think of it as a replacement for
Python multiprocessing that automatically uses shared memory and Arrow for
serialization.

On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cj...@gmail.com> wrote:

> I've been reading through the PyArrow documentation and trying to
> understand how to use the tool effectively for IPC (using zero-copy).
>
> I'm on a system with 586 cores & 1TB of ram. I'm using Panda's Dataframes
> to process several 10's of gigs of data in memory and the pickling that is
> done by Python's multiprocessing API is very wasteful.
>
> I'm running a little hand-built map-reduce where I chunk the dataframe into
> N_mappers number of chunks, run some processing on them, then run some
> number N_reducers to finalize the operation. What I'd like to be able to do
> is chunk up the dataframe into Arrow Buffer objects and just have each
> mapped task read their respective Buffer object with the guarantee of
> zero-copy.
>
> I see there's a couple Filesystem abstractions for doing memory-mapped
> files. Durability isn't something I need and I'm willing to forego the
> expense of putting the files on disk.
>
> Is it possible to write the data directly to memory and pass just the
> reference around to the different processes? What's the recommended way to
> accomplish my goal here?
>
>
> Thanks in advance!
>