You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by "Malakhov, Anton" <an...@intel.com> on 2019/07/12 21:21:38 UTC

RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

Hi, folks

We were discussing improvements for the threading engine back in May and agreed to implement benchmarks (sorry, I've lost the original mail thread, here is the link: https://lists.apache.org/thread.html/c690253d0bde643a5b644af70ec1511c6e510ebc86cc970aa8d5252e@%3Cdev.arrow.apache.org%3E )

Here is update of what's going on with this effort.
We've implemented a rough prototype for group_by, aggregate, and transform execution nodes on top of Arrow (along with studying the whole data analytics domain along the way :-) ) and made them parallel, as you can see in this repository: https://github.com/anton-malakhov/nyc_taxi

The result is that all these execution nodes scale well enough and run under 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV reader takes several seconds to complete even reading from in-memory file (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus my focus recently has been around optimization of CSV parser where I have achieved 50% improvement substituting all the small object allocations via TBB scalable allocator and using TBB-based memory pool instead of default one with pre-allocated huge (2Mb) memory pages (echo 30000 > /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these tricks with jemalloc, so please try to beat or meet my times without TBB allocator. I also see other hotspots and opportunities for optimizations, some examples are memset is being heavily used while resizing buffers (why and why?) and the column builder trashes caches by not using of streaming stores.

I used TBB directly to make the execution nodes parallel, however I have also implemented a simple TBB-based ThreadPool and TaskGroup as you can see in this PR: https://github.com/aregm/arrow/pull/6
I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world task of CSV reader, I don't see any improvements yet. Or even worse, while reading the file, TBB wastes some cycles spinning.. probably because of read-ahead thread, which oversubscribes the machine. Arrow's threading better interacts with OS scheduler thus shows better performance. So, this simple approach to TBB without a deeper redesign didn't help. I'll be looking into applying more sophisticated NUMA and locality-aware tricks as I'll be cleaning paths for the data streams in the parser. Though, I'll take some time off before returning to this effort. See you in September!


Regards,
// Anton


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
hi Anton,

Ideally PRs like https://github.com/aregm/arrow/pull/6 would be made
into apache/arrow where the community can see them. I had no idea this
PR existed.

I have looked at the demo repository a little bit, but I'm not sure
what conclusions it will help reach. What we are missing at the moment
is a richer programming model / API for multi-threaded workflows that
do a mix of CPU and IO. If that programming model can make use of TBB
for better performance (but without exposing a lot of TBB-specific
details to the Arrow library developer), then that is great.

In any case, I'd prefer to collaborate in the context of the Arrow C++
library and the specific problems we are solving there. I haven't had
much time lately to write new code but one of the thing I'm most
interested in working on in the near future is the C++ Data Frame
library: https://docs.google.com/document/d/1XHe_j87n2VHGzEbnLe786GHbbcbrzbjgG8D0IXWAeHg/edit?usp=sharing.
That might be a good place to experiment with a better threading /
work-scheduling API to make things simpler for implementers.

- Wes

On Fri, Jul 12, 2019 at 4:21 PM Malakhov, Anton
<an...@intel.com> wrote:
>
> Hi, folks
>
> We were discussing improvements for the threading engine back in May and agreed to implement benchmarks (sorry, I've lost the original mail thread, here is the link: https://lists.apache.org/thread.html/c690253d0bde643a5b644af70ec1511c6e510ebc86cc970aa8d5252e@%3Cdev.arrow.apache.org%3E )
>
> Here is update of what's going on with this effort.
> We've implemented a rough prototype for group_by, aggregate, and transform execution nodes on top of Arrow (along with studying the whole data analytics domain along the way :-) ) and made them parallel, as you can see in this repository: https://github.com/anton-malakhov/nyc_taxi
>
> The result is that all these execution nodes scale well enough and run under 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV reader takes several seconds to complete even reading from in-memory file (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus my focus recently has been around optimization of CSV parser where I have achieved 50% improvement substituting all the small object allocations via TBB scalable allocator and using TBB-based memory pool instead of default one with pre-allocated huge (2Mb) memory pages (echo 30000 > /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these tricks with jemalloc, so please try to beat or meet my times without TBB allocator. I also see other hotspots and opportunities for optimizations, some examples are memset is being heavily used while resizing buffers (why and why?) and the column builder trashes caches by not using of streaming stores.
>
> I used TBB directly to make the execution nodes parallel, however I have also implemented a simple TBB-based ThreadPool and TaskGroup as you can see in this PR: https://github.com/aregm/arrow/pull/6
> I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world task of CSV reader, I don't see any improvements yet. Or even worse, while reading the file, TBB wastes some cycles spinning.. probably because of read-ahead thread, which oversubscribes the machine. Arrow's threading better interacts with OS scheduler thus shows better performance. So, this simple approach to TBB without a deeper redesign didn't help. I'll be looking into applying more sophisticated NUMA and locality-aware tricks as I'll be cleaning paths for the data streams in the parser. Though, I'll take some time off before returning to this effort. See you in September!
>
>
> Regards,
> // Anton
>

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
Useful read on this topic today from the Julia language

https://julialang.org/blog/2019/07/multithreading

On Tue, Jul 23, 2019, 12:22 AM Jacques Nadeau <ja...@apache.org> wrote:

> There are two main things that have been important to us in Dremio around
> threading:
>
> Separate threading model from algorithms. We chose to do parallelization at
> the engine level instead of the operation level. This allows us to
> substantially increase parallelization while still maintaining a strong
> thread prioritization model. This contrasts to some systems like Apache
> Impala which chose to implement threading at the operation level. This has
> ultimately hurt their ability for individual workloads to scale out within
> a node. See the experimental features around MT_DOP when the tried to
> retreat from this model and struggled to do so. It serves as an example of
> the challenges if you don't separate data algorithms from threading early
> on in design [1]. This intention was core to how we designed Gandiva, where
> an external driver makes decisions around threading and the actual
> algorithm only does small amounts of work before yielding to the driver.
> This allows a driver to make parallelization and scheduling decisions
> without having to know the internals of the algorithm. (In Dremio, these
> are all covered under the interfaces described in Operator [2] and it's
> subclasses that together provide a very simple state of operation states
> for the driver to understand.
>
> The second is that the majority of the data we work with these days is
> primarily in high latency cloud storage. While we may stage data locally, a
> huge amount of reads are impacted by the performance of cloud stores. To
> cover these performance behaviors we did two things, the first was
> introduce a very simple to use  async reading interface for data, seen at
> [3] and introduce a collaborative way that individual tasks could declare
> their blocking state to a central coordinator [4]. Happy to cover these in
> more detail if people are interested. In general, using these techniques
> have allowed us to tune many systems to a situation where the (highly)
> variable latency of cloud stores like S3 and ADLS can be mostly cloaked by
> aggressive read ahead and what we call predictive pipelining (where reading
> is guided based on latency performance characteristics along with knowledge
> of columnar formats like Parquet).
>
> [1]
>
> https://www.cloudera.com/documentation/enterprise/latest/topics/impala_mt_dop.html#mt_dop
> [2]
>
> https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/op/spi/Operator.java
> [3]
>
> https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/exec/store/dfs/async/AsyncByteReader.java
> [4]
>
> https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/threads/sharedres/SharedResourceManager.java
>
> On Mon, Jul 22, 2019 at 9:56 AM Antoine Pitrou <an...@python.org> wrote:
>
> >
> > Le 22/07/2019 à 18:52, Wes McKinney a écrit :
> > >
> > > Probably the way is to introduce async-capable read APIs into the file
> > > interfaces. For example:
> > >
> > > file->ReadAsyncBlock(thread_ctx, ...);
> > >
> > > That way the file implementation can decide whether asynchronous logic
> > > is actually needed.
> > > I doubt very much that a one-size-fits-all
> > > concurrency solution can be developed -- in some applications
> > > coarse-grained IO and CPU task scheduling may be warranted, but we
> > > need to have a solution for finer-grained scenarios where
> > >
> > > * In the memory-mapped case, there is no overhead and
> > > * The programming model is not too burdensome to the library developer
> >
> > Well, the asynchronous I/O programming model *will* be burdensome at
> > least until C++ gets coroutines (which may happen in C++20, and
> > therefore be usable somewhere around 2024 for Arrow?).
> >
> > Regards
> >
> > Antoine.
> >
>

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Jacques Nadeau <ja...@apache.org>.
There are two main things that have been important to us in Dremio around
threading:

Separate threading model from algorithms. We chose to do parallelization at
the engine level instead of the operation level. This allows us to
substantially increase parallelization while still maintaining a strong
thread prioritization model. This contrasts to some systems like Apache
Impala which chose to implement threading at the operation level. This has
ultimately hurt their ability for individual workloads to scale out within
a node. See the experimental features around MT_DOP when the tried to
retreat from this model and struggled to do so. It serves as an example of
the challenges if you don't separate data algorithms from threading early
on in design [1]. This intention was core to how we designed Gandiva, where
an external driver makes decisions around threading and the actual
algorithm only does small amounts of work before yielding to the driver.
This allows a driver to make parallelization and scheduling decisions
without having to know the internals of the algorithm. (In Dremio, these
are all covered under the interfaces described in Operator [2] and it's
subclasses that together provide a very simple state of operation states
for the driver to understand.

The second is that the majority of the data we work with these days is
primarily in high latency cloud storage. While we may stage data locally, a
huge amount of reads are impacted by the performance of cloud stores. To
cover these performance behaviors we did two things, the first was
introduce a very simple to use  async reading interface for data, seen at
[3] and introduce a collaborative way that individual tasks could declare
their blocking state to a central coordinator [4]. Happy to cover these in
more detail if people are interested. In general, using these techniques
have allowed us to tune many systems to a situation where the (highly)
variable latency of cloud stores like S3 and ADLS can be mostly cloaked by
aggressive read ahead and what we call predictive pipelining (where reading
is guided based on latency performance characteristics along with knowledge
of columnar formats like Parquet).

[1]
https://www.cloudera.com/documentation/enterprise/latest/topics/impala_mt_dop.html#mt_dop
[2]
https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/op/spi/Operator.java
[3]
https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/exec/store/dfs/async/AsyncByteReader.java
[4]
https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/threads/sharedres/SharedResourceManager.java

On Mon, Jul 22, 2019 at 9:56 AM Antoine Pitrou <an...@python.org> wrote:

>
> Le 22/07/2019 à 18:52, Wes McKinney a écrit :
> >
> > Probably the way is to introduce async-capable read APIs into the file
> > interfaces. For example:
> >
> > file->ReadAsyncBlock(thread_ctx, ...);
> >
> > That way the file implementation can decide whether asynchronous logic
> > is actually needed.
> > I doubt very much that a one-size-fits-all
> > concurrency solution can be developed -- in some applications
> > coarse-grained IO and CPU task scheduling may be warranted, but we
> > need to have a solution for finer-grained scenarios where
> >
> > * In the memory-mapped case, there is no overhead and
> > * The programming model is not too burdensome to the library developer
>
> Well, the asynchronous I/O programming model *will* be burdensome at
> least until C++ gets coroutines (which may happen in C++20, and
> therefore be usable somewhere around 2024 for Arrow?).
>
> Regards
>
> Antoine.
>

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Antoine Pitrou <an...@python.org>.
Le 22/07/2019 à 18:52, Wes McKinney a écrit :
> 
> Probably the way is to introduce async-capable read APIs into the file
> interfaces. For example:
> 
> file->ReadAsyncBlock(thread_ctx, ...);
> 
> That way the file implementation can decide whether asynchronous logic
> is actually needed.
> I doubt very much that a one-size-fits-all
> concurrency solution can be developed -- in some applications
> coarse-grained IO and CPU task scheduling may be warranted, but we
> need to have a solution for finer-grained scenarios where
> 
> * In the memory-mapped case, there is no overhead and
> * The programming model is not too burdensome to the library developer

Well, the asynchronous I/O programming model *will* be burdensome at
least until C++ gets coroutines (which may happen in C++20, and
therefore be usable somewhere around 2024 for Arrow?).

Regards

Antoine.

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
On Mon, Jul 22, 2019 at 11:42 AM Antoine Pitrou <so...@pitrou.net> wrote:
>
> On Mon, 22 Jul 2019 11:07:43 -0500
> Wes McKinney <we...@gmail.com> wrote:
> >
> > Right, which is why I'm suggesting a simple model to allow threads
> > that are waiting on IO to allow other threads to execute.
>
> If you are doing memory-mapped IO, how do you plan to tell whether and
> when you'll be going to wait for IO?
>

Probably the way is to introduce async-capable read APIs into the file
interfaces. For example:

file->ReadAsyncBlock(thread_ctx, ...);

That way the file implementation can decide whether asynchronous logic
is actually needed. I doubt very much that a one-size-fits-all
concurrency solution can be developed -- in some applications
coarse-grained IO and CPU task scheduling may be warranted, but we
need to have a solution for finer-grained scenarios where

* In the memory-mapped case, there is no overhead and
* The programming model is not too burdensome to the library developer

> Regards
>
> Antoine.
>
>

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Antoine Pitrou <so...@pitrou.net>.
On Mon, 22 Jul 2019 11:07:43 -0500
Wes McKinney <we...@gmail.com> wrote:
> 
> Right, which is why I'm suggesting a simple model to allow threads
> that are waiting on IO to allow other threads to execute.

If you are doing memory-mapped IO, how do you plan to tell whether and
when you'll be going to wait for IO?

Regards

Antoine.



Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
On Mon, Jul 22, 2019 at 10:49 AM Antoine Pitrou <an...@python.org> wrote:
>
>
> Le 18/07/2019 à 00:25, Wes McKinney a écrit :
> >
> > * We look forward in the stream until we find a complete Thrift data
> > page header. This may trigger 0 or more (possibly multiple) Read calls
> > to the underlying "file" handle. In the default case, the data is all
> > actually in memory so the reads are zero copy buffer slices.
>
> If the file is memory-mapped, it doesn't mean everything is in RAM.
> Starting to read a page may incur a page fault and some unexpected
> blocking I/O.
>
> The solution to hide I/O costs could be to use madvise() (in which case
> the background read is done by the kernel without any need for
> user-visible IO threads).  Similarly, on a regular file one can use
> fadvise().  This may mean that the whole issue of "how to hide I/O for a
> given source" may be stream-specific (for example, if a file is
> S3-backed, perhaps you want to issue a HTTP fetch in background?).
>

I think we need to be designing around remote filesystems with
unpredictable latency and throughput. Anyone involved in data
warehousing systems in the cloud is going to be intimately familiar
with these issues -- a system that's designed around local disk and
memory-mapping generally isn't going to adapt well to remote
filesystems.

> > # Model B (CPU and IO work split into tasks that execute on different
> > thread queues)
> >
> > Pros
> > - Not sure
> >
> > Cons
> > - Could cause performance issues if the IO tasks are mostly free (e.g.
> > due to buffering)
>
> In the model B, the decision of whether to use a background thread or
> some other means of hiding I/O costs could also be pushed down into the
> stream implementation.
>
> > I think we need to investigate some asynchronous C++ programming libraries like
> >
> > https://github.com/facebook/folly/tree/master/folly/fibers
> >
> > to see how organizations with mature C++ practices are handling these
> > issues from a programming model standpoint
>
> Well, right now our model is synchronous I/O.  If we want to switch to
> asynchronous I/O we'll have to redesign a lot of APIs.  Also, since C++
> doesn't have a convenient story for asynchronous I/O or coroutines
> (yet), this will make programming similarly significantly more painful,
> which is (IMO) something we'd like to avoid.  And I'm not mentioning the
> problem of mapping the C++ asynchronous I/O model on the corresponding
> Python primitives...
>

Right, which is why I'm suggesting a simple model to allow threads
that are waiting on IO to allow other threads to execute. Currently
they block.

>
> More generally, I'm wary of significantly complicating our I/O handling
> until we have reliable reproducers of I/O-originated performance issues
> with Arrow.
>

If it helps, I can spend some time implementing Model A as it relates
to reading Parquet files in parallel. If you introduce a small amount
of latency into reads (10-50ms per read call -- such as you would
experience using Amazon S3) the current synchronous approach will have
significant IO-wait-related performance issues.

> Regards
>
> Antoine.

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Antoine Pitrou <an...@python.org>.
Le 18/07/2019 à 00:25, Wes McKinney a écrit :
> 
> * We look forward in the stream until we find a complete Thrift data
> page header. This may trigger 0 or more (possibly multiple) Read calls
> to the underlying "file" handle. In the default case, the data is all
> actually in memory so the reads are zero copy buffer slices.

If the file is memory-mapped, it doesn't mean everything is in RAM.
Starting to read a page may incur a page fault and some unexpected
blocking I/O.

The solution to hide I/O costs could be to use madvise() (in which case
the background read is done by the kernel without any need for
user-visible IO threads).  Similarly, on a regular file one can use
fadvise().  This may mean that the whole issue of "how to hide I/O for a
given source" may be stream-specific (for example, if a file is
S3-backed, perhaps you want to issue a HTTP fetch in background?).

> # Model B (CPU and IO work split into tasks that execute on different
> thread queues)
> 
> Pros
> - Not sure
> 
> Cons
> - Could cause performance issues if the IO tasks are mostly free (e.g.
> due to buffering)

In the model B, the decision of whether to use a background thread or
some other means of hiding I/O costs could also be pushed down into the
stream implementation.

> I think we need to investigate some asynchronous C++ programming libraries like
> 
> https://github.com/facebook/folly/tree/master/folly/fibers
> 
> to see how organizations with mature C++ practices are handling these
> issues from a programming model standpoint

Well, right now our model is synchronous I/O.  If we want to switch to
asynchronous I/O we'll have to redesign a lot of APIs.  Also, since C++
doesn't have a convenient story for asynchronous I/O or coroutines
(yet), this will make programming similarly significantly more painful,
which is (IMO) something we'd like to avoid.  And I'm not mentioning the
problem of mapping the C++ asynchronous I/O model on the corresponding
Python primitives...


More generally, I'm wary of significantly complicating our I/O handling
until we have reliable reproducers of I/O-originated performance issues
with Arrow.

Regards

Antoine.

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
I've been looking at little bit at this in the context of Parquet files

One of the read hot paths in cpp/src/parquet is the function that
reads and decompresses data pages from the stream:

(SerializedPageReader::NextPage)
https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_reader.cc#L143

The control flow goes like this:

* We look forward in the stream until we find a complete Thrift data
page header. This may trigger 0 or more (possibly multiple) Read calls
to the underlying "file" handle. In the default case, the data is all
actually in memory so the reads are zero copy buffer slices. The
reason we don't _always_ do zero copy is that some users want to
reduce the RAM footprint of the decoder (so we don't hold the whole
compressed column chunk in memory all at once)
* We deserialize the data page header
* We perform a Read to read the data page body
* We decompress the data page body
* Control returns to the main decoding loop that materializes values
from each data page into the output buffer

Under the programming models proposed

# Model A (CPU threads signal "idleness", causing a temporary increase
in the number of running tasks)

Pros:
- relatively simple for the developer. Instead of writing

stream_->Peek(allowed_page_size, &buffer);

we write something like

exec_ctx_->WaitIO([&]() { stream_->Peek(allowed_page_size, &buffer); };

The IO-wait signal could also be pushed down into the stream_'s
implementation so in the zero-copy case there is no overhead

Cons
- Not sure the context-switching implications since a hot loop might
cause jumps between CPU cores (I'm really out of my depth here...). It
makes me wonder if we need to look at something optimized for high
performance asynchronous task scheduling:

# Model B (CPU and IO work split into tasks that execute on different
thread queues)

Pros
- Not sure

Cons
- Could cause performance issues if the IO tasks are mostly free (e.g.
due to buffering)

The ideal approach could actually be a hybrid of Models A and B --
there's no particular reason that the programming models cannot
coexist (except that code that uses Model A approach might make code
that has optimized itself for Model B slower).

I think we need to investigate some asynchronous C++ programming libraries like

https://github.com/facebook/folly/tree/master/folly/fibers

to see how organizations with mature C++ practices are handling these
issues from a programming model standpoint

On Mon, Jul 15, 2019 at 3:15 PM Wes McKinney <we...@gmail.com> wrote:
>
> On Mon, Jul 15, 2019 at 12:01 PM Antoine Pitrou <so...@pitrou.net> wrote:
> >
> > On Mon, 15 Jul 2019 11:49:56 -0500
> > Wes McKinney <we...@gmail.com> wrote:
> > >
> > > For example, suppose we had a thread pool with a limit of 8 concurrent
> > > tasks. Now 4 of them perform IO calls. Hypothetically this should
> > > happen:
> > >
> > > * Thread pool increments a "soft limit" to allow 4 more tasks to
> > > spawn, so at this point technically we have 12 active tasks
> > > * When each IO call returns, the soft limit is decremented
> > > * The soft limit can be constrained to be some multiple of the hard
> > > limit. So if we have a hard limit of 8 CPU-bound threads, then we
> > > might allow an additional 8 tasks to be spawned if a CPU bound thread
> > > indicates that it's waiting for IO
> >
> > Well, there are two approaches to this:
> >
> > * the approach you are proposing
> > * the approach where IO is done in separate worker threads so that we
> >   needn't resize the main thread pool when IO is done
> >
> > Advantages of the second approach:
> >
> > * No need to dynamically resize the main thread pool (which may
> >   difficult to achieve in an efficient manner).
> > * CPU-bound threads can stay pinned on the same HW cores and threads
> >   most of the time, which is probably good for cache locality and to
> >   avoid migration costs.
> >
> > Advantages of the first approach:
> >
> > * The programming model is probably simpler.
> >
> > Also, the first approach is not workable if e.g. TBB doesn't support it
> > (?).
>
> Agreed with both points. I'd like to investigate these approaches to
> see what makes the most sense from a programming model and efficiency
> / performance standpoint.
>
> Currently we have lots of code that looks like (pseudocode)
>
> function Func(State* mutable_state) {
>    CPUTask1(mutable_state);
>    IOTask1(mutable_state);
>    CPUTask2(mutable_state)
>    IOTask2(mutable_state);
>    CPUTask3(mutable_state);
>    ...
> }
>
> Either approach is going to require us to develop a programming model
> where a task scheduler is passed into many functions, so such code has
> to be refactored to push work into the scheduler rather than doing the
> work in the current thread. You could certainly argue that we should
> elect for an API which maximizes our flexibility with regards to
> scheduling work (e.g. having separate thread pools for IO and CPU).
>
> Task scheduling may also need to be aware of IO resource identities to
> control concurrent reads of sources that are sensitive to that (e.g.
> some filesystems may work fine accessed by 16 threads in parallel,
> where others will not).
>
> Probably we need to figure out at least what the programming model
> ought to look like so we can start refactoring old code (e.g.
> parquet-cpp internals) and writing new code in a more
> concurrency-minded way.
>
> >
> > Regards
> >
> > Antoine.
> >
> >

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
On Mon, Jul 15, 2019 at 12:01 PM Antoine Pitrou <so...@pitrou.net> wrote:
>
> On Mon, 15 Jul 2019 11:49:56 -0500
> Wes McKinney <we...@gmail.com> wrote:
> >
> > For example, suppose we had a thread pool with a limit of 8 concurrent
> > tasks. Now 4 of them perform IO calls. Hypothetically this should
> > happen:
> >
> > * Thread pool increments a "soft limit" to allow 4 more tasks to
> > spawn, so at this point technically we have 12 active tasks
> > * When each IO call returns, the soft limit is decremented
> > * The soft limit can be constrained to be some multiple of the hard
> > limit. So if we have a hard limit of 8 CPU-bound threads, then we
> > might allow an additional 8 tasks to be spawned if a CPU bound thread
> > indicates that it's waiting for IO
>
> Well, there are two approaches to this:
>
> * the approach you are proposing
> * the approach where IO is done in separate worker threads so that we
>   needn't resize the main thread pool when IO is done
>
> Advantages of the second approach:
>
> * No need to dynamically resize the main thread pool (which may
>   difficult to achieve in an efficient manner).
> * CPU-bound threads can stay pinned on the same HW cores and threads
>   most of the time, which is probably good for cache locality and to
>   avoid migration costs.
>
> Advantages of the first approach:
>
> * The programming model is probably simpler.
>
> Also, the first approach is not workable if e.g. TBB doesn't support it
> (?).

Agreed with both points. I'd like to investigate these approaches to
see what makes the most sense from a programming model and efficiency
/ performance standpoint.

Currently we have lots of code that looks like (pseudocode)

function Func(State* mutable_state) {
   CPUTask1(mutable_state);
   IOTask1(mutable_state);
   CPUTask2(mutable_state)
   IOTask2(mutable_state);
   CPUTask3(mutable_state);
   ...
}

Either approach is going to require us to develop a programming model
where a task scheduler is passed into many functions, so such code has
to be refactored to push work into the scheduler rather than doing the
work in the current thread. You could certainly argue that we should
elect for an API which maximizes our flexibility with regards to
scheduling work (e.g. having separate thread pools for IO and CPU).

Task scheduling may also need to be aware of IO resource identities to
control concurrent reads of sources that are sensitive to that (e.g.
some filesystems may work fine accessed by 16 threads in parallel,
where others will not).

Probably we need to figure out at least what the programming model
ought to look like so we can start refactoring old code (e.g.
parquet-cpp internals) and writing new code in a more
concurrency-minded way.

>
> Regards
>
> Antoine.
>
>

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Antoine Pitrou <so...@pitrou.net>.
On Mon, 15 Jul 2019 11:49:56 -0500
Wes McKinney <we...@gmail.com> wrote:
> 
> For example, suppose we had a thread pool with a limit of 8 concurrent
> tasks. Now 4 of them perform IO calls. Hypothetically this should
> happen:
> 
> * Thread pool increments a "soft limit" to allow 4 more tasks to
> spawn, so at this point technically we have 12 active tasks
> * When each IO call returns, the soft limit is decremented
> * The soft limit can be constrained to be some multiple of the hard
> limit. So if we have a hard limit of 8 CPU-bound threads, then we
> might allow an additional 8 tasks to be spawned if a CPU bound thread
> indicates that it's waiting for IO

Well, there are two approaches to this:

* the approach you are proposing
* the approach where IO is done in separate worker threads so that we
  needn't resize the main thread pool when IO is done

Advantages of the second approach:

* No need to dynamically resize the main thread pool (which may
  difficult to achieve in an efficient manner).
* CPU-bound threads can stay pinned on the same HW cores and threads
  most of the time, which is probably good for cache locality and to
  avoid migration costs.

Advantages of the first approach:

* The programming model is probably simpler.

Also, the first approach is not workable if e.g. TBB doesn't support it
(?).

Regards

Antoine.



Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Wes McKinney <we...@gmail.com>.
On Mon, Jul 15, 2019 at 11:38 AM Antoine Pitrou <an...@python.org> wrote:
>
>
> Hi Anton,
>
> Le 12/07/2019 à 23:21, Malakhov, Anton a écrit :
> >
> > The result is that all these execution nodes scale well enough and run under 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV reader takes several seconds to complete even reading from in-memory file (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus my focus recently has been around optimization of CSV parser where I have achieved 50% improvement substituting all the small object allocations via TBB scalable allocator and using TBB-based memory pool instead of default one with pre-allocated huge (2Mb) memory pages (echo 30000 > /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these tricks with jemalloc, so please try to beat or meet my times without TBB allocator.
>
> That sounds interesting, though optimizing memory allocations is
> probably not the most enticing use case for TBB.  Memory allocators can
> fare differently on different workloads, and just because TBB is better
> in some situation doesn't mean it'll always be better.  Similarly,
> jemalloc is not the best for every use case.
>
> Note that, as Arrow is a library, we don't want to impose a memory
> allocator on the user, hence why jemalloc is merely optional.
>
> (one reason we added the jemalloc option is that jemalloc has
> non-standard APIs for aligned allocation and reallocation, btw)
>
> > I also see other hotspots and opportunities for optimizations, some examples are memset is being heavily used while resizing buffers (why and why?) and the column builder trashes caches by not using of streaming stores.
>
> Could you open JIRA issues with your investigations?  I'd be interested
> to know what the actual execution bottlenecks are in the CSV reader.
>
> > I used TBB directly to make the execution nodes parallel, however I have also implemented a simple TBB-based ThreadPool and TaskGroup as you can see in this PR: https://github.com/aregm/arrow/pull/6
> > I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world task of CSV reader, I don't see any improvements yet.
>
> One thing you could try is shrink the block size in CSV reader and see
> when performance starts to fall significantly.  With the current
> TaskGroup overhead, small block sizes will suffer a lot.  I expect TBB
> to fare better.
>
> (and / or try a CSV file with a hundred columns or so)
>
> > Or even worse, while reading the file, TBB wastes some cycles spinning.
>
> That doesn't sound good (but is a separate issue from the main TaskGroup
> usage, IMHO).  TBB doesn't provide a facility for background IO threads
> perhaps?
>

I think we need to spend some design effort on an programming model /
API for these code paths that do a mix of IO and deserialization. This
is also a problem with Parquet files -- a CPU thread that is
deserializing a column will sit idle while it waits for IO. IMHO such
IO calls need to be able to signal to the concurrency manager that
another task can be started.

For example, suppose we had a thread pool with a limit of 8 concurrent
tasks. Now 4 of them perform IO calls. Hypothetically this should
happen:

* Thread pool increments a "soft limit" to allow 4 more tasks to
spawn, so at this point technically we have 12 active tasks
* When each IO call returns, the soft limit is decremented
* The soft limit can be constrained to be some multiple of the hard
limit. So if we have a hard limit of 8 CPU-bound threads, then we
might allow an additional 8 tasks to be spawned if a CPU bound thread
indicates that it's waiting for IO

I think that any code in the codebase that does a mix of CPU and IO
should be retrofitted with some kind of object to allow code to signal
that it's about to wait for IO.

> > I'll be looking into applying more sophisticated NUMA and locality-aware tricks as I'll be cleaning paths for the data streams in the parser.
>
> Hmm, as a first approach, I don't think we should waste time trying such
> sophisticated optimizations (well, of course, you are free to do so :-)).
>
> Regards
>
> Antoine.

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

Posted by Antoine Pitrou <an...@python.org>.
Hi Anton,

Le 12/07/2019 à 23:21, Malakhov, Anton a écrit :
> 
> The result is that all these execution nodes scale well enough and run under 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV reader takes several seconds to complete even reading from in-memory file (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus my focus recently has been around optimization of CSV parser where I have achieved 50% improvement substituting all the small object allocations via TBB scalable allocator and using TBB-based memory pool instead of default one with pre-allocated huge (2Mb) memory pages (echo 30000 > /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these tricks with jemalloc, so please try to beat or meet my times without TBB allocator.

That sounds interesting, though optimizing memory allocations is
probably not the most enticing use case for TBB.  Memory allocators can
fare differently on different workloads, and just because TBB is better
in some situation doesn't mean it'll always be better.  Similarly,
jemalloc is not the best for every use case.

Note that, as Arrow is a library, we don't want to impose a memory
allocator on the user, hence why jemalloc is merely optional.

(one reason we added the jemalloc option is that jemalloc has
non-standard APIs for aligned allocation and reallocation, btw)

> I also see other hotspots and opportunities for optimizations, some examples are memset is being heavily used while resizing buffers (why and why?) and the column builder trashes caches by not using of streaming stores.

Could you open JIRA issues with your investigations?  I'd be interested
to know what the actual execution bottlenecks are in the CSV reader.

> I used TBB directly to make the execution nodes parallel, however I have also implemented a simple TBB-based ThreadPool and TaskGroup as you can see in this PR: https://github.com/aregm/arrow/pull/6
> I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world task of CSV reader, I don't see any improvements yet.

One thing you could try is shrink the block size in CSV reader and see
when performance starts to fall significantly.  With the current
TaskGroup overhead, small block sizes will suffer a lot.  I expect TBB
to fare better.

(and / or try a CSV file with a hundred columns or so)

> Or even worse, while reading the file, TBB wastes some cycles spinning.

That doesn't sound good (but is a separate issue from the main TaskGroup
usage, IMHO).  TBB doesn't provide a facility for background IO threads
perhaps?

> I'll be looking into applying more sophisticated NUMA and locality-aware tricks as I'll be cleaning paths for the data streams in the parser.

Hmm, as a first approach, I don't think we should waste time trying such
sophisticated optimizations (well, of course, you are free to do so :-)).

Regards

Antoine.