You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Weston Pace <we...@gmail.com> on 2021/02/03 03:02:28 UTC

Threading Improvements Proposal

This is a follow up to a discussion from last September [3].  I've
been investigating Arrow's use of threading and I/O and I believe
there are some improvements that could be made.  Arrow is currently
supporting two threading options (single thread and "per-core" thread
pool).  Both of these approaches are hindered if blocking I/O is
performed on a CPU worker thread.

It is somewhat alleviated by using background threads for I/O (in the
readahead iterator) but this implementation is not complete and does
not allow for nested parallelism.  I would like to convert Arrow's I/O
operations to an asynchronous model (expanding on the existing futures
API).  I have already converted the CSV reader in this fashion [2] as
a proof of concept.

I have written a more detailed proposal here [1].  Please feel free to
suggest improvements or alternate approaches.  Also, please let me
know if I missed any goals or considerations I should keep in mind.

Also, hello, this email is a bit of an introduction.  I have
previously made one or two small comments/changes but I am hoping to
be more involved going forwards.  I've mostly worked on proprietary
test and measurement software but have recently joined Ursa Computing
which will allow me more time to work on Arrow.

Thanks,

Weston Pace

[1] https://docs.google.com/document/d/1tO2WwYL-G2cB_MCPqYguKjKkRT7mZ8C2Gc9ONvspfgo/edit?usp=sharing
[2] https://github.com/apache/arrow/pull/9095
[3] https://mail-archives.apache.org/mod_mbox/arrow-dev/202009.mbox/%3CCAJPUwMDmU3rFt6Upyis%3DyXB%3DECkmrjdncgR9xj%3DDFapJt9FfUg%40mail.gmail.com%3E

Re: Threading Improvements Proposal

Posted by Antoine Pitrou <an...@python.org>.
Le 17/02/2021 à 05:20, Micah Kornfield a écrit :
>>
>> If a method could potentially run some kind of long term blocking I/O
>> wait then yes.  So reading / writing tables & datasets, IPC,
>> filesystem APIs, etc. will all need to adapt.  It doesn't have to be
>> all at once.  CPU only functions would remain as they are.  So table
>> manipulation, compute functions, etc. would remain as they are.  For
>> example, there would never be any advantage to creating an
>> asynchronous method to drop a column from a table.
> 
> 
> My main concern is around the "viralness" of Futures. I think they are good
> in some cases but can become hard to reason about/error prone if you aren't
> used to working with them day in/out.  I don't have any concrete
> recommendation at this point, just something we should be careful about
> when doing the refactoring.

I think it's ok to expose synchronous facades at key points in the Arrow
API, to avoid having to deal with futures when you don't need to.

Regards

Antoine.

Re: Threading Improvements Proposal

Posted by Micah Kornfield <em...@gmail.com>.
>
> If a method could potentially run some kind of long term blocking I/O
> wait then yes.  So reading / writing tables & datasets, IPC,
> filesystem APIs, etc. will all need to adapt.  It doesn't have to be
> all at once.  CPU only functions would remain as they are.  So table
> manipulation, compute functions, etc. would remain as they are.  For
> example, there would never be any advantage to creating an
> asynchronous method to drop a column from a table.


My main concern is around the "viralness" of Futures. I think they are good
in some cases but can become hard to reason about/error prone if you aren't
used to working with them day in/out.  I don't have any concrete
recommendation at this point, just something we should be careful about
when doing the refactoring.

On Tue, Feb 16, 2021 at 4:43 AM Antoine Pitrou <an...@python.org> wrote:

>
> Thanks for this useful writeup and the enseuing discussion.  What you're
> proposing basically looks sound to me.
>
> Regards
>
> Antoine.
>
>
> Le 16/02/2021 à 09:29, Weston Pace a écrit :
> > Thanks for the input.  I appreciate you both taking the time to look
> > through this.  I'll consolidate the points here.
> >
> > From Wes:
> >
> >>  I hypothesize that the bottom of the stack is a thread pool with a
> > queue-per-thread that implements work stealing.
> >
> > Yes.  I think there may be two pools (one for I/O and one for CPU) but
> > for the work I'm intending (unblocking deadlock and getting I/O off
> > CPU pool) what is there today is sufficient.  Moving to work stealing
> > would be a secondary priority.  It's possible a query scheduler might
> > have special needs.  For example, quickstep wants a dedicated "master
> > scheduler" thread which cannot be stolen from.
> >
> >> Some code paths might use this low-level task API directly
> >
> > Yes, I have no intention of completely shadowing the lower level
> > executor, only adding optional utilities useful for building
> > pipelines.
> >
> >> I've brought this up in the past, but if we are comfortable with more
> threads than CPU cores...let me know if it doesn't make sense.
> >
> > It makes sense and other projects (e.g. C#, postgres) use this kind of
> > "more threads than CPU" model.  It can be simpler because it doesn't
> > require you to split a long running job with blocking waits into
> > smaller jobs but it won't offer more capability/functionality.  Both
> > the quickstep model and the asynchronous generators model have
> > mechanisms in place so that "follow-up tasks" which are ready to run
> > will run immediately without going to the back of the queue.  This is
> > done by running a callback immediately instead of submitting it to the
> > thread pool.
> >
> > The downsides of the "more threads" appraoch are:
> > * It does not support the single thread case well
> > * It puts scheduling in the hands of the OS.  It can actually prevent
> > the benefit you described.  If you create a new thread / task pool to
> > address some "run right away" tasks then there is no guarantee the new
> > thread will be given a core by the OS.
> > * If often leads to more communication between threads within a task
> > which means more potential for race conditions (this last point is a
> > little subjective but matches my personal experience)
> >
> >>  If there is a mechanism for async task producers to coexist alongside
> with code that manually manages the execution order of tasks generated by
> its task graph (thinking of query engine code here a la Quickstep)
> >
> > Yes.  Any of these "small tasks" models work together pretty well.
> > I've taken a decent look at quickstep and I am confident it will be
> > compatible.  Many of these tasks will serve as input jobs for any kind
> > of query engine so it will need to be.
> >
> > From Micah:
> >
> >> One thing that I don't quite understand with this proposal is the scope?
> >
> > My primary scope is to get blocking I/O waits off of the CPU thread
> > pool (nested parallelism and potentially improved pipelining are bonus
> > benefits).
> >
> >> Is the intention that most APIs will eventually work with Futures
> instead of raw return values
> >
> > If a method could potentially run some kind of long term blocking I/O
> > wait then yes.  So reading / writing tables & datasets, IPC,
> > filesystem APIs, etc. will all need to adapt.  It doesn't have to be
> > all at once.  CPU only functions would remain as they are.  So table
> > manipulation, compute functions, etc. would remain as they are.  For
> > example, there would never be any advantage to creating an
> > asynchronous method to drop a column from a table.
> >
> > Publically things can remain as they are to achieve my primary goal.
> > Once that is complete it could be a feature enhancement to allow an
> > asynchronous alternative.  It's useful for those that are already
> > doing asynchronous code.  In that scenario I'd propose following the
> > .NET model and having duplicate APIs (e.g. Read and ReadAsync).
> >
> > Internally, duplicate APIs will probably be needed as things migrate
> > but once more pieces move to an asynchronous model then the
> > synchronous API could go away.  It's hard to put any kind of number on
> > something but if you look at the CSV reader then probably 10% or less
> > of the code path (in units of "lines of code") is in methods returning
> > futures.  Probably much smaller once you consider all the code behind
> > the parsing / inferring / type handling / array building.  If you
> > think of your code as a tree structure where the nodes are methods and
> > the edges are calls the asynchronous path is fairly straight line from
> > the topmost API call to some I/O handoff lower down the tree.  All of
> > the code in the "branches" on either side of this path is synchronous.
> >
> > On Mon, Feb 15, 2021 at 6:49 PM Micah Kornfield <em...@gmail.com>
> wrote:
> >>
> >> I took a pass through this, thank you for a good discussion of the
> >> alternative.  One thing that I don't quite understand with this
> proposal is
> >> the scope?  Is the intention that most APIs will eventually work with
> >> Futures instead of raw return values (i.e. returning a Table or Record
> >> batch will never be a thing, but instead you get references to
> >> Future<Table>)?
> >>
> >> Thanks,
> >> Micah
> >>
> >> On Mon, Feb 15, 2021 at 2:15 PM Wes McKinney <we...@gmail.com>
> wrote:
> >>
> >>> hi Weston,
> >>>
> >>> Thanks for putting this comprehensive and informative document
> together.
> >>>
> >>> There are several layers of problems to consider, just thinking out
> loud:
> >>>
> >>> * I hypothesize that the bottom of the stack is a thread pool with a
> >>> queue-per-thread that implements work stealing. Some code paths might
> >>> use this low-level task API directly, for example a workload putting
> >>> all of its tasks into one particular queue and letting the other
> >>> threads take work if they are idle.
> >>>
> >>> * I've brought this up in the past, but if we are comfortable with
> >>> more threads than CPU cores, we may allow for the base level thread
> >>> pool to be expanded dynamically. The tradeoff here is coarse
> >>> granularity context switching between tasks only at time of task
> >>> completion vs. the OS context-switching mid-task between threads. For
> >>> example, if there is a code path which wishes to guarantee that a
> >>> thread is being put to work right away to execute its tasks, even if
> >>> all of the other queues are full of other tasks, then this could
> >>> partially address the task prioritization problem discussed in the
> >>> document. If there is a notion of a "task producer" or a "workload"
> >>> and then the number of task producers exceeds the size of the thread
> >>> pool, then additional an thread+dedicated task queue for that thread
> >>> could be created to handle tasks submitted by the producer. Maybe this
> >>> is a bad idea (I'm not an expert in this domain after all), let me
> >>> know if it doesn't make sense.
> >>>
> >>> * I agree that we should encourage as much code as possible to use the
> >>> asynchronous model — per above, if there is a mechanism for async task
> >>> producers to coexist alongside with code that manually manages the
> >>> execution order of tasks generated by its task graph (thinking of
> >>> query engine code here a la Quickstep), then that might be good.
> >>>
> >>> Lots to do here but excited to see things evolve here and see the
> >>> project grow faster and more scalable on systems with a lot of cores
> >>> that do a lot of mixed IO/CPU work!
> >>>
> >>> - Wes
> >>>
> >>> On Tue, Feb 2, 2021 at 9:02 PM Weston Pace <we...@gmail.com>
> wrote:
> >>>>
> >>>> This is a follow up to a discussion from last September [3].  I've
> >>>> been investigating Arrow's use of threading and I/O and I believe
> >>>> there are some improvements that could be made.  Arrow is currently
> >>>> supporting two threading options (single thread and "per-core" thread
> >>>> pool).  Both of these approaches are hindered if blocking I/O is
> >>>> performed on a CPU worker thread.
> >>>>
> >>>> It is somewhat alleviated by using background threads for I/O (in the
> >>>> readahead iterator) but this implementation is not complete and does
> >>>> not allow for nested parallelism.  I would like to convert Arrow's I/O
> >>>> operations to an asynchronous model (expanding on the existing futures
> >>>> API).  I have already converted the CSV reader in this fashion [2] as
> >>>> a proof of concept.
> >>>>
> >>>> I have written a more detailed proposal here [1].  Please feel free to
> >>>> suggest improvements or alternate approaches.  Also, please let me
> >>>> know if I missed any goals or considerations I should keep in mind.
> >>>>
> >>>> Also, hello, this email is a bit of an introduction.  I have
> >>>> previously made one or two small comments/changes but I am hoping to
> >>>> be more involved going forwards.  I've mostly worked on proprietary
> >>>> test and measurement software but have recently joined Ursa Computing
> >>>> which will allow me more time to work on Arrow.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Weston Pace
> >>>>
> >>>> [1]
> >>>
> https://docs.google.com/document/d/1tO2WwYL-G2cB_MCPqYguKjKkRT7mZ8C2Gc9ONvspfgo/edit?usp=sharing
> >>>> [2] https://github.com/apache/arrow/pull/9095
> >>>> [3]
> >>>
> https://mail-archives.apache.org/mod_mbox/arrow-dev/202009.mbox/%3CCAJPUwMDmU3rFt6Upyis%3DyXB%3DECkmrjdncgR9xj%3DDFapJt9FfUg%40mail.gmail.com%3E
> >>>
>

Re: Threading Improvements Proposal

Posted by Antoine Pitrou <an...@python.org>.
Thanks for this useful writeup and the enseuing discussion.  What you're
proposing basically looks sound to me.

Regards

Antoine.


Le 16/02/2021 à 09:29, Weston Pace a écrit :
> Thanks for the input.  I appreciate you both taking the time to look
> through this.  I'll consolidate the points here.
> 
> From Wes:
> 
>>  I hypothesize that the bottom of the stack is a thread pool with a
> queue-per-thread that implements work stealing.
> 
> Yes.  I think there may be two pools (one for I/O and one for CPU) but
> for the work I'm intending (unblocking deadlock and getting I/O off
> CPU pool) what is there today is sufficient.  Moving to work stealing
> would be a secondary priority.  It's possible a query scheduler might
> have special needs.  For example, quickstep wants a dedicated "master
> scheduler" thread which cannot be stolen from.
> 
>> Some code paths might use this low-level task API directly
> 
> Yes, I have no intention of completely shadowing the lower level
> executor, only adding optional utilities useful for building
> pipelines.
> 
>> I've brought this up in the past, but if we are comfortable with more threads than CPU cores...let me know if it doesn't make sense.
> 
> It makes sense and other projects (e.g. C#, postgres) use this kind of
> "more threads than CPU" model.  It can be simpler because it doesn't
> require you to split a long running job with blocking waits into
> smaller jobs but it won't offer more capability/functionality.  Both
> the quickstep model and the asynchronous generators model have
> mechanisms in place so that "follow-up tasks" which are ready to run
> will run immediately without going to the back of the queue.  This is
> done by running a callback immediately instead of submitting it to the
> thread pool.
> 
> The downsides of the "more threads" appraoch are:
> * It does not support the single thread case well
> * It puts scheduling in the hands of the OS.  It can actually prevent
> the benefit you described.  If you create a new thread / task pool to
> address some "run right away" tasks then there is no guarantee the new
> thread will be given a core by the OS.
> * If often leads to more communication between threads within a task
> which means more potential for race conditions (this last point is a
> little subjective but matches my personal experience)
> 
>>  If there is a mechanism for async task producers to coexist alongside with code that manually manages the execution order of tasks generated by its task graph (thinking of query engine code here a la Quickstep)
> 
> Yes.  Any of these "small tasks" models work together pretty well.
> I've taken a decent look at quickstep and I am confident it will be
> compatible.  Many of these tasks will serve as input jobs for any kind
> of query engine so it will need to be.
> 
> From Micah:
> 
>> One thing that I don't quite understand with this proposal is the scope?
> 
> My primary scope is to get blocking I/O waits off of the CPU thread
> pool (nested parallelism and potentially improved pipelining are bonus
> benefits).
> 
>> Is the intention that most APIs will eventually work with Futures instead of raw return values
> 
> If a method could potentially run some kind of long term blocking I/O
> wait then yes.  So reading / writing tables & datasets, IPC,
> filesystem APIs, etc. will all need to adapt.  It doesn't have to be
> all at once.  CPU only functions would remain as they are.  So table
> manipulation, compute functions, etc. would remain as they are.  For
> example, there would never be any advantage to creating an
> asynchronous method to drop a column from a table.
> 
> Publically things can remain as they are to achieve my primary goal.
> Once that is complete it could be a feature enhancement to allow an
> asynchronous alternative.  It's useful for those that are already
> doing asynchronous code.  In that scenario I'd propose following the
> .NET model and having duplicate APIs (e.g. Read and ReadAsync).
> 
> Internally, duplicate APIs will probably be needed as things migrate
> but once more pieces move to an asynchronous model then the
> synchronous API could go away.  It's hard to put any kind of number on
> something but if you look at the CSV reader then probably 10% or less
> of the code path (in units of "lines of code") is in methods returning
> futures.  Probably much smaller once you consider all the code behind
> the parsing / inferring / type handling / array building.  If you
> think of your code as a tree structure where the nodes are methods and
> the edges are calls the asynchronous path is fairly straight line from
> the topmost API call to some I/O handoff lower down the tree.  All of
> the code in the "branches" on either side of this path is synchronous.
> 
> On Mon, Feb 15, 2021 at 6:49 PM Micah Kornfield <em...@gmail.com> wrote:
>>
>> I took a pass through this, thank you for a good discussion of the
>> alternative.  One thing that I don't quite understand with this proposal is
>> the scope?  Is the intention that most APIs will eventually work with
>> Futures instead of raw return values (i.e. returning a Table or Record
>> batch will never be a thing, but instead you get references to
>> Future<Table>)?
>>
>> Thanks,
>> Micah
>>
>> On Mon, Feb 15, 2021 at 2:15 PM Wes McKinney <we...@gmail.com> wrote:
>>
>>> hi Weston,
>>>
>>> Thanks for putting this comprehensive and informative document together.
>>>
>>> There are several layers of problems to consider, just thinking out loud:
>>>
>>> * I hypothesize that the bottom of the stack is a thread pool with a
>>> queue-per-thread that implements work stealing. Some code paths might
>>> use this low-level task API directly, for example a workload putting
>>> all of its tasks into one particular queue and letting the other
>>> threads take work if they are idle.
>>>
>>> * I've brought this up in the past, but if we are comfortable with
>>> more threads than CPU cores, we may allow for the base level thread
>>> pool to be expanded dynamically. The tradeoff here is coarse
>>> granularity context switching between tasks only at time of task
>>> completion vs. the OS context-switching mid-task between threads. For
>>> example, if there is a code path which wishes to guarantee that a
>>> thread is being put to work right away to execute its tasks, even if
>>> all of the other queues are full of other tasks, then this could
>>> partially address the task prioritization problem discussed in the
>>> document. If there is a notion of a "task producer" or a "workload"
>>> and then the number of task producers exceeds the size of the thread
>>> pool, then additional an thread+dedicated task queue for that thread
>>> could be created to handle tasks submitted by the producer. Maybe this
>>> is a bad idea (I'm not an expert in this domain after all), let me
>>> know if it doesn't make sense.
>>>
>>> * I agree that we should encourage as much code as possible to use the
>>> asynchronous model — per above, if there is a mechanism for async task
>>> producers to coexist alongside with code that manually manages the
>>> execution order of tasks generated by its task graph (thinking of
>>> query engine code here a la Quickstep), then that might be good.
>>>
>>> Lots to do here but excited to see things evolve here and see the
>>> project grow faster and more scalable on systems with a lot of cores
>>> that do a lot of mixed IO/CPU work!
>>>
>>> - Wes
>>>
>>> On Tue, Feb 2, 2021 at 9:02 PM Weston Pace <we...@gmail.com> wrote:
>>>>
>>>> This is a follow up to a discussion from last September [3].  I've
>>>> been investigating Arrow's use of threading and I/O and I believe
>>>> there are some improvements that could be made.  Arrow is currently
>>>> supporting two threading options (single thread and "per-core" thread
>>>> pool).  Both of these approaches are hindered if blocking I/O is
>>>> performed on a CPU worker thread.
>>>>
>>>> It is somewhat alleviated by using background threads for I/O (in the
>>>> readahead iterator) but this implementation is not complete and does
>>>> not allow for nested parallelism.  I would like to convert Arrow's I/O
>>>> operations to an asynchronous model (expanding on the existing futures
>>>> API).  I have already converted the CSV reader in this fashion [2] as
>>>> a proof of concept.
>>>>
>>>> I have written a more detailed proposal here [1].  Please feel free to
>>>> suggest improvements or alternate approaches.  Also, please let me
>>>> know if I missed any goals or considerations I should keep in mind.
>>>>
>>>> Also, hello, this email is a bit of an introduction.  I have
>>>> previously made one or two small comments/changes but I am hoping to
>>>> be more involved going forwards.  I've mostly worked on proprietary
>>>> test and measurement software but have recently joined Ursa Computing
>>>> which will allow me more time to work on Arrow.
>>>>
>>>> Thanks,
>>>>
>>>> Weston Pace
>>>>
>>>> [1]
>>> https://docs.google.com/document/d/1tO2WwYL-G2cB_MCPqYguKjKkRT7mZ8C2Gc9ONvspfgo/edit?usp=sharing
>>>> [2] https://github.com/apache/arrow/pull/9095
>>>> [3]
>>> https://mail-archives.apache.org/mod_mbox/arrow-dev/202009.mbox/%3CCAJPUwMDmU3rFt6Upyis%3DyXB%3DECkmrjdncgR9xj%3DDFapJt9FfUg%40mail.gmail.com%3E
>>>

Re: Threading Improvements Proposal

Posted by Weston Pace <we...@gmail.com>.
Thanks for the input.  I appreciate you both taking the time to look
through this.  I'll consolidate the points here.

From Wes:

>  I hypothesize that the bottom of the stack is a thread pool with a
queue-per-thread that implements work stealing.

Yes.  I think there may be two pools (one for I/O and one for CPU) but
for the work I'm intending (unblocking deadlock and getting I/O off
CPU pool) what is there today is sufficient.  Moving to work stealing
would be a secondary priority.  It's possible a query scheduler might
have special needs.  For example, quickstep wants a dedicated "master
scheduler" thread which cannot be stolen from.

> Some code paths might use this low-level task API directly

Yes, I have no intention of completely shadowing the lower level
executor, only adding optional utilities useful for building
pipelines.

> I've brought this up in the past, but if we are comfortable with more threads than CPU cores...let me know if it doesn't make sense.

It makes sense and other projects (e.g. C#, postgres) use this kind of
"more threads than CPU" model.  It can be simpler because it doesn't
require you to split a long running job with blocking waits into
smaller jobs but it won't offer more capability/functionality.  Both
the quickstep model and the asynchronous generators model have
mechanisms in place so that "follow-up tasks" which are ready to run
will run immediately without going to the back of the queue.  This is
done by running a callback immediately instead of submitting it to the
thread pool.

The downsides of the "more threads" appraoch are:
* It does not support the single thread case well
* It puts scheduling in the hands of the OS.  It can actually prevent
the benefit you described.  If you create a new thread / task pool to
address some "run right away" tasks then there is no guarantee the new
thread will be given a core by the OS.
* If often leads to more communication between threads within a task
which means more potential for race conditions (this last point is a
little subjective but matches my personal experience)

>  If there is a mechanism for async task producers to coexist alongside with code that manually manages the execution order of tasks generated by its task graph (thinking of query engine code here a la Quickstep)

Yes.  Any of these "small tasks" models work together pretty well.
I've taken a decent look at quickstep and I am confident it will be
compatible.  Many of these tasks will serve as input jobs for any kind
of query engine so it will need to be.

From Micah:

> One thing that I don't quite understand with this proposal is the scope?

My primary scope is to get blocking I/O waits off of the CPU thread
pool (nested parallelism and potentially improved pipelining are bonus
benefits).

> Is the intention that most APIs will eventually work with Futures instead of raw return values

If a method could potentially run some kind of long term blocking I/O
wait then yes.  So reading / writing tables & datasets, IPC,
filesystem APIs, etc. will all need to adapt.  It doesn't have to be
all at once.  CPU only functions would remain as they are.  So table
manipulation, compute functions, etc. would remain as they are.  For
example, there would never be any advantage to creating an
asynchronous method to drop a column from a table.

Publically things can remain as they are to achieve my primary goal.
Once that is complete it could be a feature enhancement to allow an
asynchronous alternative.  It's useful for those that are already
doing asynchronous code.  In that scenario I'd propose following the
.NET model and having duplicate APIs (e.g. Read and ReadAsync).

Internally, duplicate APIs will probably be needed as things migrate
but once more pieces move to an asynchronous model then the
synchronous API could go away.  It's hard to put any kind of number on
something but if you look at the CSV reader then probably 10% or less
of the code path (in units of "lines of code") is in methods returning
futures.  Probably much smaller once you consider all the code behind
the parsing / inferring / type handling / array building.  If you
think of your code as a tree structure where the nodes are methods and
the edges are calls the asynchronous path is fairly straight line from
the topmost API call to some I/O handoff lower down the tree.  All of
the code in the "branches" on either side of this path is synchronous.

On Mon, Feb 15, 2021 at 6:49 PM Micah Kornfield <em...@gmail.com> wrote:
>
> I took a pass through this, thank you for a good discussion of the
> alternative.  One thing that I don't quite understand with this proposal is
> the scope?  Is the intention that most APIs will eventually work with
> Futures instead of raw return values (i.e. returning a Table or Record
> batch will never be a thing, but instead you get references to
> Future<Table>)?
>
> Thanks,
> Micah
>
> On Mon, Feb 15, 2021 at 2:15 PM Wes McKinney <we...@gmail.com> wrote:
>
> > hi Weston,
> >
> > Thanks for putting this comprehensive and informative document together.
> >
> > There are several layers of problems to consider, just thinking out loud:
> >
> > * I hypothesize that the bottom of the stack is a thread pool with a
> > queue-per-thread that implements work stealing. Some code paths might
> > use this low-level task API directly, for example a workload putting
> > all of its tasks into one particular queue and letting the other
> > threads take work if they are idle.
> >
> > * I've brought this up in the past, but if we are comfortable with
> > more threads than CPU cores, we may allow for the base level thread
> > pool to be expanded dynamically. The tradeoff here is coarse
> > granularity context switching between tasks only at time of task
> > completion vs. the OS context-switching mid-task between threads. For
> > example, if there is a code path which wishes to guarantee that a
> > thread is being put to work right away to execute its tasks, even if
> > all of the other queues are full of other tasks, then this could
> > partially address the task prioritization problem discussed in the
> > document. If there is a notion of a "task producer" or a "workload"
> > and then the number of task producers exceeds the size of the thread
> > pool, then additional an thread+dedicated task queue for that thread
> > could be created to handle tasks submitted by the producer. Maybe this
> > is a bad idea (I'm not an expert in this domain after all), let me
> > know if it doesn't make sense.
> >
> > * I agree that we should encourage as much code as possible to use the
> > asynchronous model — per above, if there is a mechanism for async task
> > producers to coexist alongside with code that manually manages the
> > execution order of tasks generated by its task graph (thinking of
> > query engine code here a la Quickstep), then that might be good.
> >
> > Lots to do here but excited to see things evolve here and see the
> > project grow faster and more scalable on systems with a lot of cores
> > that do a lot of mixed IO/CPU work!
> >
> > - Wes
> >
> > On Tue, Feb 2, 2021 at 9:02 PM Weston Pace <we...@gmail.com> wrote:
> > >
> > > This is a follow up to a discussion from last September [3].  I've
> > > been investigating Arrow's use of threading and I/O and I believe
> > > there are some improvements that could be made.  Arrow is currently
> > > supporting two threading options (single thread and "per-core" thread
> > > pool).  Both of these approaches are hindered if blocking I/O is
> > > performed on a CPU worker thread.
> > >
> > > It is somewhat alleviated by using background threads for I/O (in the
> > > readahead iterator) but this implementation is not complete and does
> > > not allow for nested parallelism.  I would like to convert Arrow's I/O
> > > operations to an asynchronous model (expanding on the existing futures
> > > API).  I have already converted the CSV reader in this fashion [2] as
> > > a proof of concept.
> > >
> > > I have written a more detailed proposal here [1].  Please feel free to
> > > suggest improvements or alternate approaches.  Also, please let me
> > > know if I missed any goals or considerations I should keep in mind.
> > >
> > > Also, hello, this email is a bit of an introduction.  I have
> > > previously made one or two small comments/changes but I am hoping to
> > > be more involved going forwards.  I've mostly worked on proprietary
> > > test and measurement software but have recently joined Ursa Computing
> > > which will allow me more time to work on Arrow.
> > >
> > > Thanks,
> > >
> > > Weston Pace
> > >
> > > [1]
> > https://docs.google.com/document/d/1tO2WwYL-G2cB_MCPqYguKjKkRT7mZ8C2Gc9ONvspfgo/edit?usp=sharing
> > > [2] https://github.com/apache/arrow/pull/9095
> > > [3]
> > https://mail-archives.apache.org/mod_mbox/arrow-dev/202009.mbox/%3CCAJPUwMDmU3rFt6Upyis%3DyXB%3DECkmrjdncgR9xj%3DDFapJt9FfUg%40mail.gmail.com%3E
> >

Re: Threading Improvements Proposal

Posted by Micah Kornfield <em...@gmail.com>.
I took a pass through this, thank you for a good discussion of the
alternative.  One thing that I don't quite understand with this proposal is
the scope?  Is the intention that most APIs will eventually work with
Futures instead of raw return values (i.e. returning a Table or Record
batch will never be a thing, but instead you get references to
Future<Table>)?

Thanks,
Micah

On Mon, Feb 15, 2021 at 2:15 PM Wes McKinney <we...@gmail.com> wrote:

> hi Weston,
>
> Thanks for putting this comprehensive and informative document together.
>
> There are several layers of problems to consider, just thinking out loud:
>
> * I hypothesize that the bottom of the stack is a thread pool with a
> queue-per-thread that implements work stealing. Some code paths might
> use this low-level task API directly, for example a workload putting
> all of its tasks into one particular queue and letting the other
> threads take work if they are idle.
>
> * I've brought this up in the past, but if we are comfortable with
> more threads than CPU cores, we may allow for the base level thread
> pool to be expanded dynamically. The tradeoff here is coarse
> granularity context switching between tasks only at time of task
> completion vs. the OS context-switching mid-task between threads. For
> example, if there is a code path which wishes to guarantee that a
> thread is being put to work right away to execute its tasks, even if
> all of the other queues are full of other tasks, then this could
> partially address the task prioritization problem discussed in the
> document. If there is a notion of a "task producer" or a "workload"
> and then the number of task producers exceeds the size of the thread
> pool, then additional an thread+dedicated task queue for that thread
> could be created to handle tasks submitted by the producer. Maybe this
> is a bad idea (I'm not an expert in this domain after all), let me
> know if it doesn't make sense.
>
> * I agree that we should encourage as much code as possible to use the
> asynchronous model — per above, if there is a mechanism for async task
> producers to coexist alongside with code that manually manages the
> execution order of tasks generated by its task graph (thinking of
> query engine code here a la Quickstep), then that might be good.
>
> Lots to do here but excited to see things evolve here and see the
> project grow faster and more scalable on systems with a lot of cores
> that do a lot of mixed IO/CPU work!
>
> - Wes
>
> On Tue, Feb 2, 2021 at 9:02 PM Weston Pace <we...@gmail.com> wrote:
> >
> > This is a follow up to a discussion from last September [3].  I've
> > been investigating Arrow's use of threading and I/O and I believe
> > there are some improvements that could be made.  Arrow is currently
> > supporting two threading options (single thread and "per-core" thread
> > pool).  Both of these approaches are hindered if blocking I/O is
> > performed on a CPU worker thread.
> >
> > It is somewhat alleviated by using background threads for I/O (in the
> > readahead iterator) but this implementation is not complete and does
> > not allow for nested parallelism.  I would like to convert Arrow's I/O
> > operations to an asynchronous model (expanding on the existing futures
> > API).  I have already converted the CSV reader in this fashion [2] as
> > a proof of concept.
> >
> > I have written a more detailed proposal here [1].  Please feel free to
> > suggest improvements or alternate approaches.  Also, please let me
> > know if I missed any goals or considerations I should keep in mind.
> >
> > Also, hello, this email is a bit of an introduction.  I have
> > previously made one or two small comments/changes but I am hoping to
> > be more involved going forwards.  I've mostly worked on proprietary
> > test and measurement software but have recently joined Ursa Computing
> > which will allow me more time to work on Arrow.
> >
> > Thanks,
> >
> > Weston Pace
> >
> > [1]
> https://docs.google.com/document/d/1tO2WwYL-G2cB_MCPqYguKjKkRT7mZ8C2Gc9ONvspfgo/edit?usp=sharing
> > [2] https://github.com/apache/arrow/pull/9095
> > [3]
> https://mail-archives.apache.org/mod_mbox/arrow-dev/202009.mbox/%3CCAJPUwMDmU3rFt6Upyis%3DyXB%3DECkmrjdncgR9xj%3DDFapJt9FfUg%40mail.gmail.com%3E
>

Re: Threading Improvements Proposal

Posted by Wes McKinney <we...@gmail.com>.
hi Weston,

Thanks for putting this comprehensive and informative document together.

There are several layers of problems to consider, just thinking out loud:

* I hypothesize that the bottom of the stack is a thread pool with a
queue-per-thread that implements work stealing. Some code paths might
use this low-level task API directly, for example a workload putting
all of its tasks into one particular queue and letting the other
threads take work if they are idle.

* I've brought this up in the past, but if we are comfortable with
more threads than CPU cores, we may allow for the base level thread
pool to be expanded dynamically. The tradeoff here is coarse
granularity context switching between tasks only at time of task
completion vs. the OS context-switching mid-task between threads. For
example, if there is a code path which wishes to guarantee that a
thread is being put to work right away to execute its tasks, even if
all of the other queues are full of other tasks, then this could
partially address the task prioritization problem discussed in the
document. If there is a notion of a "task producer" or a "workload"
and then the number of task producers exceeds the size of the thread
pool, then additional an thread+dedicated task queue for that thread
could be created to handle tasks submitted by the producer. Maybe this
is a bad idea (I'm not an expert in this domain after all), let me
know if it doesn't make sense.

* I agree that we should encourage as much code as possible to use the
asynchronous model — per above, if there is a mechanism for async task
producers to coexist alongside with code that manually manages the
execution order of tasks generated by its task graph (thinking of
query engine code here a la Quickstep), then that might be good.

Lots to do here but excited to see things evolve here and see the
project grow faster and more scalable on systems with a lot of cores
that do a lot of mixed IO/CPU work!

- Wes

On Tue, Feb 2, 2021 at 9:02 PM Weston Pace <we...@gmail.com> wrote:
>
> This is a follow up to a discussion from last September [3].  I've
> been investigating Arrow's use of threading and I/O and I believe
> there are some improvements that could be made.  Arrow is currently
> supporting two threading options (single thread and "per-core" thread
> pool).  Both of these approaches are hindered if blocking I/O is
> performed on a CPU worker thread.
>
> It is somewhat alleviated by using background threads for I/O (in the
> readahead iterator) but this implementation is not complete and does
> not allow for nested parallelism.  I would like to convert Arrow's I/O
> operations to an asynchronous model (expanding on the existing futures
> API).  I have already converted the CSV reader in this fashion [2] as
> a proof of concept.
>
> I have written a more detailed proposal here [1].  Please feel free to
> suggest improvements or alternate approaches.  Also, please let me
> know if I missed any goals or considerations I should keep in mind.
>
> Also, hello, this email is a bit of an introduction.  I have
> previously made one or two small comments/changes but I am hoping to
> be more involved going forwards.  I've mostly worked on proprietary
> test and measurement software but have recently joined Ursa Computing
> which will allow me more time to work on Arrow.
>
> Thanks,
>
> Weston Pace
>
> [1] https://docs.google.com/document/d/1tO2WwYL-G2cB_MCPqYguKjKkRT7mZ8C2Gc9ONvspfgo/edit?usp=sharing
> [2] https://github.com/apache/arrow/pull/9095
> [3] https://mail-archives.apache.org/mod_mbox/arrow-dev/202009.mbox/%3CCAJPUwMDmU3rFt6Upyis%3DyXB%3DECkmrjdncgR9xj%3DDFapJt9FfUg%40mail.gmail.com%3E