You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Wes McKinney <we...@gmail.com> on 2022/05/02 23:19:35 UTC

[C++] Control flow and scheduling in C++ Engine operators / exec nodes

hi all,

I've been catching up on the C++ execution engine codebase after a
fairly long development hiatus.

I have several questions / comments about the current design of the
ExecNode and their implementations (currently: source / scan, filter,
project, union, aggregate, sink, hash join).

My current understanding of how things work is the following:

* Scan/Source nodes initiate execution through the StartProducing()
function, which spawns an asynchronous generator that yields a
sequence of input data batches. When each batch is available, it is
passed to child operators by calling their InputReceived methods

* When InputReceived is called
    * For non-blocking operators (e.g. Filter, Project), the unit of
work is performed immediately and the result is passed to the child
operator by calling its InputReceived method
    * For blocking operators (e.g. HashAggregate, HashJoin), partial
results are accumulated until the operator can begin producing output
(all input for aggregation, or until the HT has been built for the
HashJoin)

* When an error occurs, a signal to abort will be propagated up and
down the execution tree

* Eventually output lands in a Sink node, which is the desired result

One concern I have about the current structure is the way in which
ExecNode implementations are responsible for downstream control flow,
and the extent to which operator pipelining (the same thread advancing
input-output chains until reaching a pipeline breaker) is implicit
versus explicit. To give a couple examples:

* In hash aggregations (GroupByNode), when the input has been
exhausted, the GroupByNode splits the result into the desired
execution chunk size (e.g. splitting a 1M row aggregate into batches
of 64K rows) and then spawns future tasks that push these chunks
through the child output exec node (by calling InputReceived)

* In hash joins, the ExecNode accumulates batches to be inserted into
the hash table (the "probed" input), until the probed input is
exhausted, and then start asynchronously spawning tasks to probe the
completed hash table and passing the probed results into the child
output node

I would suggest that we consider a different design that decouples
task control flow from the ExecNode implementation. The purpose would
be to give the user of the C++ engine more control over task
scheduling (including the order of execution) and prioritization.

One system that does things different from the Arrow C++ Engine is
Meta's Velox project, whose operators work like this (slightly
simplified and colored by my own imperfect understanding):

* The Driver class (which is associated with a single thread) is
responsible for execution control flow. A driver moves input batches
through an operator pipeline.

* The Driver calls the Operator::addInput function with an input
batch. Operators are blocking vs. non-blocking based on whether the
Operator::needsMoreInput() function returns true. Simple operators
like Project can produce their output immediately by calling
Operator::getOutput

* When the Driver hits a blocking operator in a pipeline, it returns
control to the calling thread so the thread can switch to doing work
for a different driver

* One artifact of this design is that hash joins are split into a
HashBuild operator and a HashProbe operator so that the build and
probe stages of the hash join can be scheduled and executed more
precisely (for example: work for the pipeline that feeds the build
operator can be prioritized over the pipeline feeding the other input
to the probe).

The idea in refactoring the Arrow C++ Engine would be instead of
having a tree of ExecNodes, each of which has its own internal control
flow (including the ability to spawn downstream tasks), instead
pipelinable operators can be grouped into PipelineExecutors (which
correspond roughly to Velox's Driver concept) which are responsible
for control flow and invoking the ExecNodes in sequence. This would
make it much easier for users to customize the control flow for
particular needs (for example, the recent discussion of adding time
series joins to the C++ engine means that the current eager-push /
"local" control flow can create problematic input ordering problems).
I think this might make the codebase easier to understand and test
also (and profile / trace, maybe, too), but that is just conjecture.

As a separate matter, the C++ Engine does not have a separation
between input batches (what are called "morsels" in the HyPer paper)
and pipeline tasks (smaller cache-friendly units to move through the
pipeline), nor the ability (AFAICT) to do nested parallelism / work
stealing within pipelines (this concept is discussed in [1]).

Hopefully the above makes sense and I look forward to others' thoughts.

Thanks,
Wes

[1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Supun Kamburugamuve <su...@apache.org>.
Thanks, Weston. From your description, I can think about how the current
engine works. Let me try to map your example into execution. Then we can
explore a little bit more in detail.

WE have 300GB of data and we have a read CSV operator  (that creates an
Arrow Table) and a Write Parquet operator. Now let's say we have 2 CPU
cores (in the system) and we run both these operators at the same time.
We read; let's say 10GB of data and write that 10GB of data. Since we do
this continuously we essentially create a streaming-like system as shown
below.

Read (0 th CPU Core) ----> Write (1st CPU Core)

I believe what you are describing is something like the above. I may be
wrong.

There is another way to do this problem. That is

Read --> Write (0th Core)  (read a 10GB batch, write it and next go to the
next 10GB)
Read --> Write (1st Core)  (read a 10GB batch, write it them and next go to
the next 10GB)

The second one doesn't need back-pressure and is usually much more
efficient than the first one for batch processing.

Best,
Supun..





On Fri, May 20, 2022 at 7:38 PM Weston Pace <we...@gmail.com> wrote:

> If the amount of batch data you are processing is larger than the RAM
> on the system then back pressure is needed.  A common use case is
> dataset repartitioning.  If you are repartitioning a large (e.g.
> 300GB) dataset from CSV to parquet then the bottleneck will typically
> be the "write" stage.  Backpressure must be applied or else the system
> will run out of RAM.
>
> I'm also not sure I would describe the engine as a "batch processing
> engine".  I think the C++ engine operates at a lower level than a
> typical Spark vs. Hadoop (e.g. batch vs. streaming) abstraction.  A
> streaming application and a batch application could both make use of
> the C++ engine.  If I had to pick one then I would pick a "streaming
> engine" because we do process data in a streaming fashion.
>
> On Fri, May 20, 2022 at 4:14 PM Supun Kamburugamuve <su...@apache.org>
> wrote:
> >
> > Looking at the proposal I couldn't understand why there is a need for
> > back-pressure handling. My understanding of the Arrow C++ engine is that
> it
> > is meant to process batch data. So I couldn't think of why we need to
> > handle back-pressure as it is normally needed in streaming engines.
> >
> > Best,
> > Supun.;
> >
> > On Thu, May 12, 2022 at 1:14 PM Andrew Lamb <al...@influxdata.com>
> wrote:
> >
> > > Thank you for sharing this document.
> > >
> > > Raphael Taylor-Davies is working on a similar exercise  scheduling
> > > execution for DataFusion plans. The design doc[1] and initial PR [2]
> may be
> > > an interesting reference.
> > >
> > > In the DataFusion case we were trying to improve performance in a few
> ways:
> > > 1. Within a pipeline (same definition as in C++ proposal) consume a
> batch
> > > that was produced in the same thread if possible
> > > 2. Restrict parallelism by the number of available workers rather than
> the
> > > plan structure (e.g. if reading 100 parquet files, with 8 workers,
> don't
> > > start reading all of them at once)
> > > 3. Segregate pools used  to do async IO and CPU bound work within the
> same
> > > plan execution
> > >
> > > I think the C++ proposal would achieve 1, but it isn't clear to me
> that it
> > > would achieve 2 (though I will admit to not fully understanding it)
> and I
> > > don't know about 3
> > >
> > > While there are many similarities with what is described in the C++
> > > proposal, I would say the Rust implementation is significantly less
> > > complicated than what I think is described. In particular:
> > > * There is no notion of generators
> > > * There is no notion of internal tasks (the operators themselves are
> single
> > > threaded and the parallelism is created by generating batches in
> parallel
> > > * The scheduler logic is run directly by the worker threads (rather
> than a
> > > separate thread with message queues) as the operators produce each new
> > > batch
> > >
> > > Andrew
> > >
> > > [1]
> > >
> > >
> https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#
> > > [2] https://github.com/apache/arrow-datafusion/pull/2226
> > >
> > >
> > >
> > > On Thu, May 12, 2022 at 3:24 PM Li Jin <ic...@gmail.com> wrote:
> > >
> > > > Thanks Wes and Michal.
> > > >
> > > > We have similar concern about the current eager-push control flow
> with
> > > time
> > > > series / ordered data processing and am glad that we are not the
> only one
> > > > thinking about this.
> > > >
> > > > I have read the doc and so far just left some questions to make sure
> I
> > > > understand the proposal (admittedly the generator concept is
> somewhat new
> > > > to me) and also thinking about it in the context of streaming ordered
> > > data
> > > > processing.
> > > >
> > > > Excited to see where this goes,
> > > > Li
> > > >
> > > > On Wed, May 11, 2022 at 6:43 PM Wes McKinney <we...@gmail.com>
> > > wrote:
> > > >
> > > > > I talked about these problems with my colleague Michal Nowakiewicz
> who
> > > > > has been developing some of the C++ engine implementation over the
> > > > > last year and a half, and he wrote up this document with some ideas
> > > > > about task scheduling and control flow in the query engine for
> > > > > everyone to look at and comment:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
> > > > >
> > > > > Feedback also welcome from the Rust developers to compare/contrast
> > > > > with how DataFusion works
> > > > >
> > > > > On Tue, May 3, 2022 at 1:05 AM Weston Pace <we...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > Thanks for investigating and looking through this.  Your
> > > understanding
> > > > > > of how things work is pretty much spot on.  In addition, I think
> the
> > > > > > points you are making are valid.  Our ExecNode/ExecPlan
> interfaces
> > > are
> > > > > > extremely bare bones and similar nodes have had to reimplement
> the
> > > > > > same solutions (e.g. many nodes are using things like
> AtomicCounter,
> > > > > > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably
> the
> > > > > > most significant short term impact of cleaning this up would be
> to
> > > > > > avoid things like the race condition in [1] which happened
> because
> > > one
> > > > > > node was doing things in a slightly older way.  If anyone is
> > > > > > particularly interested in tackling this problem I'd be happy to
> go
> > > > > > into more details.
> > > > > >
> > > > > > However, I think you are slightly overselling the potential
> benefits.
> > > > > > I don't think this would make it easier to adopt morsel/batch,
> > > > > > implement asymmetric backpressure, better scheduling, work
> stealing,
> > > > > > or sequencing (all of which I agree are good ideas with the
> exception
> > > > > > of work stealing which I don't think we would significantly
> benefit
> > > > > > from).  What's more, we don't have very many nodes today and I
> think
> > > > > > there is a risk of over-learning from this small sample size.
> For
> > > > > > example, this sequencing discussion is very interesting.  I
> think an
> > > > > > asof join node is not a pipeline breaker, but it also does not
> fit
> > > the
> > > > > > mold of a standard pipeline node.  It has multiple inputs and
> there
> > > is
> > > > > > not a clear 1:1 mapping between input and output batches.  I
> don't
> > > > > > know the Velox driver model well enough to comment on it
> specifically
> > > > > > but if you were to put this node in the middle of a pipeline you
> > > might
> > > > > > end up generating empty batches, too-large batches, or not enough
> > > > > > thread tasks to saturate the cores.  If you were to put it
> between
> > > > > > pipeline drivers you would potentially lose cache locality.
> > > > > >
> > > > > > Regarding morsel/batch.  The main thing really preventing us from
> > > > > > moving to this model is the overhead cost of running small
> batches.
> > > > > > This is due to things like the problem you described in [2] and
> > > > > > somewhat demonstrated by benchmarks like [3].  As a result, as
> soon
> > > as
> > > > > > we shrink the batch size small enough to fit into L2, we start
> to see
> > > > > > overhead increase to eliminate the benefits we get from better
> cache
> > > > > > utilization (not just CPU overhead but also thread contention).
> > > > > > Unfortunately, some of the fixes here could possibly involve
> changes
> > > > > > to ExecBatch & Datum, which are used extensively in the kernel
> > > > > > infrastructure.  From my profiling, this underutilization of
> cache is
> > > > > > one of the most significant performance issues we have today.
> > > > > >
> > > > > > [1] https://github.com/apache/arrow/pull/12894
> > > > > > [2]
> https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > > > > > [3] https://github.com/apache/arrow/pull/12755
> > > > > > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmckinn@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > hi all,
> > > > > > >
> > > > > > > I've been catching up on the C++ execution engine codebase
> after a
> > > > > > > fairly long development hiatus.
> > > > > > >
> > > > > > > I have several questions / comments about the current design
> of the
> > > > > > > ExecNode and their implementations (currently: source / scan,
> > > filter,
> > > > > > > project, union, aggregate, sink, hash join).
> > > > > > >
> > > > > > > My current understanding of how things work is the following:
> > > > > > >
> > > > > > > * Scan/Source nodes initiate execution through the
> StartProducing()
> > > > > > > function, which spawns an asynchronous generator that yields a
> > > > > > > sequence of input data batches. When each batch is available,
> it is
> > > > > > > passed to child operators by calling their InputReceived
> methods
> > > > > > >
> > > > > > > * When InputReceived is called
> > > > > > >     * For non-blocking operators (e.g. Filter, Project), the
> unit
> > > of
> > > > > > > work is performed immediately and the result is passed to the
> child
> > > > > > > operator by calling its InputReceived method
> > > > > > >     * For blocking operators (e.g. HashAggregate, HashJoin),
> > > partial
> > > > > > > results are accumulated until the operator can begin producing
> > > output
> > > > > > > (all input for aggregation, or until the HT has been built for
> the
> > > > > > > HashJoin)
> > > > > > >
> > > > > > > * When an error occurs, a signal to abort will be propagated
> up and
> > > > > > > down the execution tree
> > > > > > >
> > > > > > > * Eventually output lands in a Sink node, which is the desired
> > > result
> > > > > > >
> > > > > > > One concern I have about the current structure is the way in
> which
> > > > > > > ExecNode implementations are responsible for downstream control
> > > flow,
> > > > > > > and the extent to which operator pipelining (the same thread
> > > > advancing
> > > > > > > input-output chains until reaching a pipeline breaker) is
> implicit
> > > > > > > versus explicit. To give a couple examples:
> > > > > > >
> > > > > > > * In hash aggregations (GroupByNode), when the input has been
> > > > > > > exhausted, the GroupByNode splits the result into the desired
> > > > > > > execution chunk size (e.g. splitting a 1M row aggregate into
> > > batches
> > > > > > > of 64K rows) and then spawns future tasks that push these
> chunks
> > > > > > > through the child output exec node (by calling InputReceived)
> > > > > > >
> > > > > > > * In hash joins, the ExecNode accumulates batches to be
> inserted
> > > into
> > > > > > > the hash table (the "probed" input), until the probed input is
> > > > > > > exhausted, and then start asynchronously spawning tasks to
> probe
> > > the
> > > > > > > completed hash table and passing the probed results into the
> child
> > > > > > > output node
> > > > > > >
> > > > > > > I would suggest that we consider a different design that
> decouples
> > > > > > > task control flow from the ExecNode implementation. The purpose
> > > would
> > > > > > > be to give the user of the C++ engine more control over task
> > > > > > > scheduling (including the order of execution) and
> prioritization.
> > > > > > >
> > > > > > > One system that does things different from the Arrow C++
> Engine is
> > > > > > > Meta's Velox project, whose operators work like this (slightly
> > > > > > > simplified and colored by my own imperfect understanding):
> > > > > > >
> > > > > > > * The Driver class (which is associated with a single thread)
> is
> > > > > > > responsible for execution control flow. A driver moves input
> > > batches
> > > > > > > through an operator pipeline.
> > > > > > >
> > > > > > > * The Driver calls the Operator::addInput function with an
> input
> > > > > > > batch. Operators are blocking vs. non-blocking based on
> whether the
> > > > > > > Operator::needsMoreInput() function returns true. Simple
> operators
> > > > > > > like Project can produce their output immediately by calling
> > > > > > > Operator::getOutput
> > > > > > >
> > > > > > > * When the Driver hits a blocking operator in a pipeline, it
> > > returns
> > > > > > > control to the calling thread so the thread can switch to doing
> > > work
> > > > > > > for a different driver
> > > > > > >
> > > > > > > * One artifact of this design is that hash joins are split
> into a
> > > > > > > HashBuild operator and a HashProbe operator so that the build
> and
> > > > > > > probe stages of the hash join can be scheduled and executed
> more
> > > > > > > precisely (for example: work for the pipeline that feeds the
> build
> > > > > > > operator can be prioritized over the pipeline feeding the other
> > > input
> > > > > > > to the probe).
> > > > > > >
> > > > > > > The idea in refactoring the Arrow C++ Engine would be instead
> of
> > > > > > > having a tree of ExecNodes, each of which has its own internal
> > > > control
> > > > > > > flow (including the ability to spawn downstream tasks), instead
> > > > > > > pipelinable operators can be grouped into PipelineExecutors
> (which
> > > > > > > correspond roughly to Velox's Driver concept) which are
> responsible
> > > > > > > for control flow and invoking the ExecNodes in sequence. This
> would
> > > > > > > make it much easier for users to customize the control flow for
> > > > > > > particular needs (for example, the recent discussion of adding
> time
> > > > > > > series joins to the C++ engine means that the current
> eager-push /
> > > > > > > "local" control flow can create problematic input ordering
> > > problems).
> > > > > > > I think this might make the codebase easier to understand and
> test
> > > > > > > also (and profile / trace, maybe, too), but that is just
> > > conjecture.
> > > > > > >
> > > > > > > As a separate matter, the C++ Engine does not have a separation
> > > > > > > between input batches (what are called "morsels" in the HyPer
> > > paper)
> > > > > > > and pipeline tasks (smaller cache-friendly units to move
> through
> > > the
> > > > > > > pipeline), nor the ability (AFAICT) to do nested parallelism /
> work
> > > > > > > stealing within pipelines (this concept is discussed in [1]).
> > > > > > >
> > > > > > > Hopefully the above makes sense and I look forward to others'
> > > > thoughts.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Wes
> > > > > > >
> > > > > > > [1]:
> > > > https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
> > > > >
> > > >
> > >
> >
> >
> > --
> > Supun Kamburugamuve
>


-- 
Supun Kamburugamuve

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Weston Pace <we...@gmail.com>.
If the amount of batch data you are processing is larger than the RAM
on the system then back pressure is needed.  A common use case is
dataset repartitioning.  If you are repartitioning a large (e.g.
300GB) dataset from CSV to parquet then the bottleneck will typically
be the "write" stage.  Backpressure must be applied or else the system
will run out of RAM.

I'm also not sure I would describe the engine as a "batch processing
engine".  I think the C++ engine operates at a lower level than a
typical Spark vs. Hadoop (e.g. batch vs. streaming) abstraction.  A
streaming application and a batch application could both make use of
the C++ engine.  If I had to pick one then I would pick a "streaming
engine" because we do process data in a streaming fashion.

On Fri, May 20, 2022 at 4:14 PM Supun Kamburugamuve <su...@apache.org> wrote:
>
> Looking at the proposal I couldn't understand why there is a need for
> back-pressure handling. My understanding of the Arrow C++ engine is that it
> is meant to process batch data. So I couldn't think of why we need to
> handle back-pressure as it is normally needed in streaming engines.
>
> Best,
> Supun.;
>
> On Thu, May 12, 2022 at 1:14 PM Andrew Lamb <al...@influxdata.com> wrote:
>
> > Thank you for sharing this document.
> >
> > Raphael Taylor-Davies is working on a similar exercise  scheduling
> > execution for DataFusion plans. The design doc[1] and initial PR [2] may be
> > an interesting reference.
> >
> > In the DataFusion case we were trying to improve performance in a few ways:
> > 1. Within a pipeline (same definition as in C++ proposal) consume a batch
> > that was produced in the same thread if possible
> > 2. Restrict parallelism by the number of available workers rather than the
> > plan structure (e.g. if reading 100 parquet files, with 8 workers, don't
> > start reading all of them at once)
> > 3. Segregate pools used  to do async IO and CPU bound work within the same
> > plan execution
> >
> > I think the C++ proposal would achieve 1, but it isn't clear to me that it
> > would achieve 2 (though I will admit to not fully understanding it) and I
> > don't know about 3
> >
> > While there are many similarities with what is described in the C++
> > proposal, I would say the Rust implementation is significantly less
> > complicated than what I think is described. In particular:
> > * There is no notion of generators
> > * There is no notion of internal tasks (the operators themselves are single
> > threaded and the parallelism is created by generating batches in parallel
> > * The scheduler logic is run directly by the worker threads (rather than a
> > separate thread with message queues) as the operators produce each new
> > batch
> >
> > Andrew
> >
> > [1]
> >
> > https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#
> > [2] https://github.com/apache/arrow-datafusion/pull/2226
> >
> >
> >
> > On Thu, May 12, 2022 at 3:24 PM Li Jin <ic...@gmail.com> wrote:
> >
> > > Thanks Wes and Michal.
> > >
> > > We have similar concern about the current eager-push control flow with
> > time
> > > series / ordered data processing and am glad that we are not the only one
> > > thinking about this.
> > >
> > > I have read the doc and so far just left some questions to make sure I
> > > understand the proposal (admittedly the generator concept is somewhat new
> > > to me) and also thinking about it in the context of streaming ordered
> > data
> > > processing.
> > >
> > > Excited to see where this goes,
> > > Li
> > >
> > > On Wed, May 11, 2022 at 6:43 PM Wes McKinney <we...@gmail.com>
> > wrote:
> > >
> > > > I talked about these problems with my colleague Michal Nowakiewicz who
> > > > has been developing some of the C++ engine implementation over the
> > > > last year and a half, and he wrote up this document with some ideas
> > > > about task scheduling and control flow in the query engine for
> > > > everyone to look at and comment:
> > > >
> > > >
> > > >
> > >
> > https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
> > > >
> > > > Feedback also welcome from the Rust developers to compare/contrast
> > > > with how DataFusion works
> > > >
> > > > On Tue, May 3, 2022 at 1:05 AM Weston Pace <we...@gmail.com>
> > > wrote:
> > > > >
> > > > > Thanks for investigating and looking through this.  Your
> > understanding
> > > > > of how things work is pretty much spot on.  In addition, I think the
> > > > > points you are making are valid.  Our ExecNode/ExecPlan interfaces
> > are
> > > > > extremely bare bones and similar nodes have had to reimplement the
> > > > > same solutions (e.g. many nodes are using things like AtomicCounter,
> > > > > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> > > > > most significant short term impact of cleaning this up would be to
> > > > > avoid things like the race condition in [1] which happened because
> > one
> > > > > node was doing things in a slightly older way.  If anyone is
> > > > > particularly interested in tackling this problem I'd be happy to go
> > > > > into more details.
> > > > >
> > > > > However, I think you are slightly overselling the potential benefits.
> > > > > I don't think this would make it easier to adopt morsel/batch,
> > > > > implement asymmetric backpressure, better scheduling, work stealing,
> > > > > or sequencing (all of which I agree are good ideas with the exception
> > > > > of work stealing which I don't think we would significantly benefit
> > > > > from).  What's more, we don't have very many nodes today and I think
> > > > > there is a risk of over-learning from this small sample size.  For
> > > > > example, this sequencing discussion is very interesting.  I think an
> > > > > asof join node is not a pipeline breaker, but it also does not fit
> > the
> > > > > mold of a standard pipeline node.  It has multiple inputs and there
> > is
> > > > > not a clear 1:1 mapping between input and output batches.  I don't
> > > > > know the Velox driver model well enough to comment on it specifically
> > > > > but if you were to put this node in the middle of a pipeline you
> > might
> > > > > end up generating empty batches, too-large batches, or not enough
> > > > > thread tasks to saturate the cores.  If you were to put it between
> > > > > pipeline drivers you would potentially lose cache locality.
> > > > >
> > > > > Regarding morsel/batch.  The main thing really preventing us from
> > > > > moving to this model is the overhead cost of running small batches.
> > > > > This is due to things like the problem you described in [2] and
> > > > > somewhat demonstrated by benchmarks like [3].  As a result, as soon
> > as
> > > > > we shrink the batch size small enough to fit into L2, we start to see
> > > > > overhead increase to eliminate the benefits we get from better cache
> > > > > utilization (not just CPU overhead but also thread contention).
> > > > > Unfortunately, some of the fixes here could possibly involve changes
> > > > > to ExecBatch & Datum, which are used extensively in the kernel
> > > > > infrastructure.  From my profiling, this underutilization of cache is
> > > > > one of the most significant performance issues we have today.
> > > > >
> > > > > [1] https://github.com/apache/arrow/pull/12894
> > > > > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > > > > [3] https://github.com/apache/arrow/pull/12755
> > > > > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <we...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > hi all,
> > > > > >
> > > > > > I've been catching up on the C++ execution engine codebase after a
> > > > > > fairly long development hiatus.
> > > > > >
> > > > > > I have several questions / comments about the current design of the
> > > > > > ExecNode and their implementations (currently: source / scan,
> > filter,
> > > > > > project, union, aggregate, sink, hash join).
> > > > > >
> > > > > > My current understanding of how things work is the following:
> > > > > >
> > > > > > * Scan/Source nodes initiate execution through the StartProducing()
> > > > > > function, which spawns an asynchronous generator that yields a
> > > > > > sequence of input data batches. When each batch is available, it is
> > > > > > passed to child operators by calling their InputReceived methods
> > > > > >
> > > > > > * When InputReceived is called
> > > > > >     * For non-blocking operators (e.g. Filter, Project), the unit
> > of
> > > > > > work is performed immediately and the result is passed to the child
> > > > > > operator by calling its InputReceived method
> > > > > >     * For blocking operators (e.g. HashAggregate, HashJoin),
> > partial
> > > > > > results are accumulated until the operator can begin producing
> > output
> > > > > > (all input for aggregation, or until the HT has been built for the
> > > > > > HashJoin)
> > > > > >
> > > > > > * When an error occurs, a signal to abort will be propagated up and
> > > > > > down the execution tree
> > > > > >
> > > > > > * Eventually output lands in a Sink node, which is the desired
> > result
> > > > > >
> > > > > > One concern I have about the current structure is the way in which
> > > > > > ExecNode implementations are responsible for downstream control
> > flow,
> > > > > > and the extent to which operator pipelining (the same thread
> > > advancing
> > > > > > input-output chains until reaching a pipeline breaker) is implicit
> > > > > > versus explicit. To give a couple examples:
> > > > > >
> > > > > > * In hash aggregations (GroupByNode), when the input has been
> > > > > > exhausted, the GroupByNode splits the result into the desired
> > > > > > execution chunk size (e.g. splitting a 1M row aggregate into
> > batches
> > > > > > of 64K rows) and then spawns future tasks that push these chunks
> > > > > > through the child output exec node (by calling InputReceived)
> > > > > >
> > > > > > * In hash joins, the ExecNode accumulates batches to be inserted
> > into
> > > > > > the hash table (the "probed" input), until the probed input is
> > > > > > exhausted, and then start asynchronously spawning tasks to probe
> > the
> > > > > > completed hash table and passing the probed results into the child
> > > > > > output node
> > > > > >
> > > > > > I would suggest that we consider a different design that decouples
> > > > > > task control flow from the ExecNode implementation. The purpose
> > would
> > > > > > be to give the user of the C++ engine more control over task
> > > > > > scheduling (including the order of execution) and prioritization.
> > > > > >
> > > > > > One system that does things different from the Arrow C++ Engine is
> > > > > > Meta's Velox project, whose operators work like this (slightly
> > > > > > simplified and colored by my own imperfect understanding):
> > > > > >
> > > > > > * The Driver class (which is associated with a single thread) is
> > > > > > responsible for execution control flow. A driver moves input
> > batches
> > > > > > through an operator pipeline.
> > > > > >
> > > > > > * The Driver calls the Operator::addInput function with an input
> > > > > > batch. Operators are blocking vs. non-blocking based on whether the
> > > > > > Operator::needsMoreInput() function returns true. Simple operators
> > > > > > like Project can produce their output immediately by calling
> > > > > > Operator::getOutput
> > > > > >
> > > > > > * When the Driver hits a blocking operator in a pipeline, it
> > returns
> > > > > > control to the calling thread so the thread can switch to doing
> > work
> > > > > > for a different driver
> > > > > >
> > > > > > * One artifact of this design is that hash joins are split into a
> > > > > > HashBuild operator and a HashProbe operator so that the build and
> > > > > > probe stages of the hash join can be scheduled and executed more
> > > > > > precisely (for example: work for the pipeline that feeds the build
> > > > > > operator can be prioritized over the pipeline feeding the other
> > input
> > > > > > to the probe).
> > > > > >
> > > > > > The idea in refactoring the Arrow C++ Engine would be instead of
> > > > > > having a tree of ExecNodes, each of which has its own internal
> > > control
> > > > > > flow (including the ability to spawn downstream tasks), instead
> > > > > > pipelinable operators can be grouped into PipelineExecutors (which
> > > > > > correspond roughly to Velox's Driver concept) which are responsible
> > > > > > for control flow and invoking the ExecNodes in sequence. This would
> > > > > > make it much easier for users to customize the control flow for
> > > > > > particular needs (for example, the recent discussion of adding time
> > > > > > series joins to the C++ engine means that the current eager-push /
> > > > > > "local" control flow can create problematic input ordering
> > problems).
> > > > > > I think this might make the codebase easier to understand and test
> > > > > > also (and profile / trace, maybe, too), but that is just
> > conjecture.
> > > > > >
> > > > > > As a separate matter, the C++ Engine does not have a separation
> > > > > > between input batches (what are called "morsels" in the HyPer
> > paper)
> > > > > > and pipeline tasks (smaller cache-friendly units to move through
> > the
> > > > > > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > > > > > stealing within pipelines (this concept is discussed in [1]).
> > > > > >
> > > > > > Hopefully the above makes sense and I look forward to others'
> > > thoughts.
> > > > > >
> > > > > > Thanks,
> > > > > > Wes
> > > > > >
> > > > > > [1]:
> > > https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
> > > >
> > >
> >
>
>
> --
> Supun Kamburugamuve

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Supun Kamburugamuve <su...@apache.org>.
Looking at the proposal I couldn't understand why there is a need for
back-pressure handling. My understanding of the Arrow C++ engine is that it
is meant to process batch data. So I couldn't think of why we need to
handle back-pressure as it is normally needed in streaming engines.

Best,
Supun.;

On Thu, May 12, 2022 at 1:14 PM Andrew Lamb <al...@influxdata.com> wrote:

> Thank you for sharing this document.
>
> Raphael Taylor-Davies is working on a similar exercise  scheduling
> execution for DataFusion plans. The design doc[1] and initial PR [2] may be
> an interesting reference.
>
> In the DataFusion case we were trying to improve performance in a few ways:
> 1. Within a pipeline (same definition as in C++ proposal) consume a batch
> that was produced in the same thread if possible
> 2. Restrict parallelism by the number of available workers rather than the
> plan structure (e.g. if reading 100 parquet files, with 8 workers, don't
> start reading all of them at once)
> 3. Segregate pools used  to do async IO and CPU bound work within the same
> plan execution
>
> I think the C++ proposal would achieve 1, but it isn't clear to me that it
> would achieve 2 (though I will admit to not fully understanding it) and I
> don't know about 3
>
> While there are many similarities with what is described in the C++
> proposal, I would say the Rust implementation is significantly less
> complicated than what I think is described. In particular:
> * There is no notion of generators
> * There is no notion of internal tasks (the operators themselves are single
> threaded and the parallelism is created by generating batches in parallel
> * The scheduler logic is run directly by the worker threads (rather than a
> separate thread with message queues) as the operators produce each new
> batch
>
> Andrew
>
> [1]
>
> https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#
> [2] https://github.com/apache/arrow-datafusion/pull/2226
>
>
>
> On Thu, May 12, 2022 at 3:24 PM Li Jin <ic...@gmail.com> wrote:
>
> > Thanks Wes and Michal.
> >
> > We have similar concern about the current eager-push control flow with
> time
> > series / ordered data processing and am glad that we are not the only one
> > thinking about this.
> >
> > I have read the doc and so far just left some questions to make sure I
> > understand the proposal (admittedly the generator concept is somewhat new
> > to me) and also thinking about it in the context of streaming ordered
> data
> > processing.
> >
> > Excited to see where this goes,
> > Li
> >
> > On Wed, May 11, 2022 at 6:43 PM Wes McKinney <we...@gmail.com>
> wrote:
> >
> > > I talked about these problems with my colleague Michal Nowakiewicz who
> > > has been developing some of the C++ engine implementation over the
> > > last year and a half, and he wrote up this document with some ideas
> > > about task scheduling and control flow in the query engine for
> > > everyone to look at and comment:
> > >
> > >
> > >
> >
> https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
> > >
> > > Feedback also welcome from the Rust developers to compare/contrast
> > > with how DataFusion works
> > >
> > > On Tue, May 3, 2022 at 1:05 AM Weston Pace <we...@gmail.com>
> > wrote:
> > > >
> > > > Thanks for investigating and looking through this.  Your
> understanding
> > > > of how things work is pretty much spot on.  In addition, I think the
> > > > points you are making are valid.  Our ExecNode/ExecPlan interfaces
> are
> > > > extremely bare bones and similar nodes have had to reimplement the
> > > > same solutions (e.g. many nodes are using things like AtomicCounter,
> > > > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> > > > most significant short term impact of cleaning this up would be to
> > > > avoid things like the race condition in [1] which happened because
> one
> > > > node was doing things in a slightly older way.  If anyone is
> > > > particularly interested in tackling this problem I'd be happy to go
> > > > into more details.
> > > >
> > > > However, I think you are slightly overselling the potential benefits.
> > > > I don't think this would make it easier to adopt morsel/batch,
> > > > implement asymmetric backpressure, better scheduling, work stealing,
> > > > or sequencing (all of which I agree are good ideas with the exception
> > > > of work stealing which I don't think we would significantly benefit
> > > > from).  What's more, we don't have very many nodes today and I think
> > > > there is a risk of over-learning from this small sample size.  For
> > > > example, this sequencing discussion is very interesting.  I think an
> > > > asof join node is not a pipeline breaker, but it also does not fit
> the
> > > > mold of a standard pipeline node.  It has multiple inputs and there
> is
> > > > not a clear 1:1 mapping between input and output batches.  I don't
> > > > know the Velox driver model well enough to comment on it specifically
> > > > but if you were to put this node in the middle of a pipeline you
> might
> > > > end up generating empty batches, too-large batches, or not enough
> > > > thread tasks to saturate the cores.  If you were to put it between
> > > > pipeline drivers you would potentially lose cache locality.
> > > >
> > > > Regarding morsel/batch.  The main thing really preventing us from
> > > > moving to this model is the overhead cost of running small batches.
> > > > This is due to things like the problem you described in [2] and
> > > > somewhat demonstrated by benchmarks like [3].  As a result, as soon
> as
> > > > we shrink the batch size small enough to fit into L2, we start to see
> > > > overhead increase to eliminate the benefits we get from better cache
> > > > utilization (not just CPU overhead but also thread contention).
> > > > Unfortunately, some of the fixes here could possibly involve changes
> > > > to ExecBatch & Datum, which are used extensively in the kernel
> > > > infrastructure.  From my profiling, this underutilization of cache is
> > > > one of the most significant performance issues we have today.
> > > >
> > > > [1] https://github.com/apache/arrow/pull/12894
> > > > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > > > [3] https://github.com/apache/arrow/pull/12755
> > > > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <we...@gmail.com>
> > wrote:
> > > > >
> > > > > hi all,
> > > > >
> > > > > I've been catching up on the C++ execution engine codebase after a
> > > > > fairly long development hiatus.
> > > > >
> > > > > I have several questions / comments about the current design of the
> > > > > ExecNode and their implementations (currently: source / scan,
> filter,
> > > > > project, union, aggregate, sink, hash join).
> > > > >
> > > > > My current understanding of how things work is the following:
> > > > >
> > > > > * Scan/Source nodes initiate execution through the StartProducing()
> > > > > function, which spawns an asynchronous generator that yields a
> > > > > sequence of input data batches. When each batch is available, it is
> > > > > passed to child operators by calling their InputReceived methods
> > > > >
> > > > > * When InputReceived is called
> > > > >     * For non-blocking operators (e.g. Filter, Project), the unit
> of
> > > > > work is performed immediately and the result is passed to the child
> > > > > operator by calling its InputReceived method
> > > > >     * For blocking operators (e.g. HashAggregate, HashJoin),
> partial
> > > > > results are accumulated until the operator can begin producing
> output
> > > > > (all input for aggregation, or until the HT has been built for the
> > > > > HashJoin)
> > > > >
> > > > > * When an error occurs, a signal to abort will be propagated up and
> > > > > down the execution tree
> > > > >
> > > > > * Eventually output lands in a Sink node, which is the desired
> result
> > > > >
> > > > > One concern I have about the current structure is the way in which
> > > > > ExecNode implementations are responsible for downstream control
> flow,
> > > > > and the extent to which operator pipelining (the same thread
> > advancing
> > > > > input-output chains until reaching a pipeline breaker) is implicit
> > > > > versus explicit. To give a couple examples:
> > > > >
> > > > > * In hash aggregations (GroupByNode), when the input has been
> > > > > exhausted, the GroupByNode splits the result into the desired
> > > > > execution chunk size (e.g. splitting a 1M row aggregate into
> batches
> > > > > of 64K rows) and then spawns future tasks that push these chunks
> > > > > through the child output exec node (by calling InputReceived)
> > > > >
> > > > > * In hash joins, the ExecNode accumulates batches to be inserted
> into
> > > > > the hash table (the "probed" input), until the probed input is
> > > > > exhausted, and then start asynchronously spawning tasks to probe
> the
> > > > > completed hash table and passing the probed results into the child
> > > > > output node
> > > > >
> > > > > I would suggest that we consider a different design that decouples
> > > > > task control flow from the ExecNode implementation. The purpose
> would
> > > > > be to give the user of the C++ engine more control over task
> > > > > scheduling (including the order of execution) and prioritization.
> > > > >
> > > > > One system that does things different from the Arrow C++ Engine is
> > > > > Meta's Velox project, whose operators work like this (slightly
> > > > > simplified and colored by my own imperfect understanding):
> > > > >
> > > > > * The Driver class (which is associated with a single thread) is
> > > > > responsible for execution control flow. A driver moves input
> batches
> > > > > through an operator pipeline.
> > > > >
> > > > > * The Driver calls the Operator::addInput function with an input
> > > > > batch. Operators are blocking vs. non-blocking based on whether the
> > > > > Operator::needsMoreInput() function returns true. Simple operators
> > > > > like Project can produce their output immediately by calling
> > > > > Operator::getOutput
> > > > >
> > > > > * When the Driver hits a blocking operator in a pipeline, it
> returns
> > > > > control to the calling thread so the thread can switch to doing
> work
> > > > > for a different driver
> > > > >
> > > > > * One artifact of this design is that hash joins are split into a
> > > > > HashBuild operator and a HashProbe operator so that the build and
> > > > > probe stages of the hash join can be scheduled and executed more
> > > > > precisely (for example: work for the pipeline that feeds the build
> > > > > operator can be prioritized over the pipeline feeding the other
> input
> > > > > to the probe).
> > > > >
> > > > > The idea in refactoring the Arrow C++ Engine would be instead of
> > > > > having a tree of ExecNodes, each of which has its own internal
> > control
> > > > > flow (including the ability to spawn downstream tasks), instead
> > > > > pipelinable operators can be grouped into PipelineExecutors (which
> > > > > correspond roughly to Velox's Driver concept) which are responsible
> > > > > for control flow and invoking the ExecNodes in sequence. This would
> > > > > make it much easier for users to customize the control flow for
> > > > > particular needs (for example, the recent discussion of adding time
> > > > > series joins to the C++ engine means that the current eager-push /
> > > > > "local" control flow can create problematic input ordering
> problems).
> > > > > I think this might make the codebase easier to understand and test
> > > > > also (and profile / trace, maybe, too), but that is just
> conjecture.
> > > > >
> > > > > As a separate matter, the C++ Engine does not have a separation
> > > > > between input batches (what are called "morsels" in the HyPer
> paper)
> > > > > and pipeline tasks (smaller cache-friendly units to move through
> the
> > > > > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > > > > stealing within pipelines (this concept is discussed in [1]).
> > > > >
> > > > > Hopefully the above makes sense and I look forward to others'
> > thoughts.
> > > > >
> > > > > Thanks,
> > > > > Wes
> > > > >
> > > > > [1]:
> > https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
> > >
> >
>


-- 
Supun Kamburugamuve

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Andrew Lamb <al...@influxdata.com>.
Thank you for sharing this document.

Raphael Taylor-Davies is working on a similar exercise  scheduling
execution for DataFusion plans. The design doc[1] and initial PR [2] may be
an interesting reference.

In the DataFusion case we were trying to improve performance in a few ways:
1. Within a pipeline (same definition as in C++ proposal) consume a batch
that was produced in the same thread if possible
2. Restrict parallelism by the number of available workers rather than the
plan structure (e.g. if reading 100 parquet files, with 8 workers, don't
start reading all of them at once)
3. Segregate pools used  to do async IO and CPU bound work within the same
plan execution

I think the C++ proposal would achieve 1, but it isn't clear to me that it
would achieve 2 (though I will admit to not fully understanding it) and I
don't know about 3

While there are many similarities with what is described in the C++
proposal, I would say the Rust implementation is significantly less
complicated than what I think is described. In particular:
* There is no notion of generators
* There is no notion of internal tasks (the operators themselves are single
threaded and the parallelism is created by generating batches in parallel
* The scheduler logic is run directly by the worker threads (rather than a
separate thread with message queues) as the operators produce each new batch

Andrew

[1]
https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#
[2] https://github.com/apache/arrow-datafusion/pull/2226



On Thu, May 12, 2022 at 3:24 PM Li Jin <ic...@gmail.com> wrote:

> Thanks Wes and Michal.
>
> We have similar concern about the current eager-push control flow with time
> series / ordered data processing and am glad that we are not the only one
> thinking about this.
>
> I have read the doc and so far just left some questions to make sure I
> understand the proposal (admittedly the generator concept is somewhat new
> to me) and also thinking about it in the context of streaming ordered data
> processing.
>
> Excited to see where this goes,
> Li
>
> On Wed, May 11, 2022 at 6:43 PM Wes McKinney <we...@gmail.com> wrote:
>
> > I talked about these problems with my colleague Michal Nowakiewicz who
> > has been developing some of the C++ engine implementation over the
> > last year and a half, and he wrote up this document with some ideas
> > about task scheduling and control flow in the query engine for
> > everyone to look at and comment:
> >
> >
> >
> https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
> >
> > Feedback also welcome from the Rust developers to compare/contrast
> > with how DataFusion works
> >
> > On Tue, May 3, 2022 at 1:05 AM Weston Pace <we...@gmail.com>
> wrote:
> > >
> > > Thanks for investigating and looking through this.  Your understanding
> > > of how things work is pretty much spot on.  In addition, I think the
> > > points you are making are valid.  Our ExecNode/ExecPlan interfaces are
> > > extremely bare bones and similar nodes have had to reimplement the
> > > same solutions (e.g. many nodes are using things like AtomicCounter,
> > > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> > > most significant short term impact of cleaning this up would be to
> > > avoid things like the race condition in [1] which happened because one
> > > node was doing things in a slightly older way.  If anyone is
> > > particularly interested in tackling this problem I'd be happy to go
> > > into more details.
> > >
> > > However, I think you are slightly overselling the potential benefits.
> > > I don't think this would make it easier to adopt morsel/batch,
> > > implement asymmetric backpressure, better scheduling, work stealing,
> > > or sequencing (all of which I agree are good ideas with the exception
> > > of work stealing which I don't think we would significantly benefit
> > > from).  What's more, we don't have very many nodes today and I think
> > > there is a risk of over-learning from this small sample size.  For
> > > example, this sequencing discussion is very interesting.  I think an
> > > asof join node is not a pipeline breaker, but it also does not fit the
> > > mold of a standard pipeline node.  It has multiple inputs and there is
> > > not a clear 1:1 mapping between input and output batches.  I don't
> > > know the Velox driver model well enough to comment on it specifically
> > > but if you were to put this node in the middle of a pipeline you might
> > > end up generating empty batches, too-large batches, or not enough
> > > thread tasks to saturate the cores.  If you were to put it between
> > > pipeline drivers you would potentially lose cache locality.
> > >
> > > Regarding morsel/batch.  The main thing really preventing us from
> > > moving to this model is the overhead cost of running small batches.
> > > This is due to things like the problem you described in [2] and
> > > somewhat demonstrated by benchmarks like [3].  As a result, as soon as
> > > we shrink the batch size small enough to fit into L2, we start to see
> > > overhead increase to eliminate the benefits we get from better cache
> > > utilization (not just CPU overhead but also thread contention).
> > > Unfortunately, some of the fixes here could possibly involve changes
> > > to ExecBatch & Datum, which are used extensively in the kernel
> > > infrastructure.  From my profiling, this underutilization of cache is
> > > one of the most significant performance issues we have today.
> > >
> > > [1] https://github.com/apache/arrow/pull/12894
> > > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > > [3] https://github.com/apache/arrow/pull/12755
> > > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <we...@gmail.com>
> wrote:
> > > >
> > > > hi all,
> > > >
> > > > I've been catching up on the C++ execution engine codebase after a
> > > > fairly long development hiatus.
> > > >
> > > > I have several questions / comments about the current design of the
> > > > ExecNode and their implementations (currently: source / scan, filter,
> > > > project, union, aggregate, sink, hash join).
> > > >
> > > > My current understanding of how things work is the following:
> > > >
> > > > * Scan/Source nodes initiate execution through the StartProducing()
> > > > function, which spawns an asynchronous generator that yields a
> > > > sequence of input data batches. When each batch is available, it is
> > > > passed to child operators by calling their InputReceived methods
> > > >
> > > > * When InputReceived is called
> > > >     * For non-blocking operators (e.g. Filter, Project), the unit of
> > > > work is performed immediately and the result is passed to the child
> > > > operator by calling its InputReceived method
> > > >     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> > > > results are accumulated until the operator can begin producing output
> > > > (all input for aggregation, or until the HT has been built for the
> > > > HashJoin)
> > > >
> > > > * When an error occurs, a signal to abort will be propagated up and
> > > > down the execution tree
> > > >
> > > > * Eventually output lands in a Sink node, which is the desired result
> > > >
> > > > One concern I have about the current structure is the way in which
> > > > ExecNode implementations are responsible for downstream control flow,
> > > > and the extent to which operator pipelining (the same thread
> advancing
> > > > input-output chains until reaching a pipeline breaker) is implicit
> > > > versus explicit. To give a couple examples:
> > > >
> > > > * In hash aggregations (GroupByNode), when the input has been
> > > > exhausted, the GroupByNode splits the result into the desired
> > > > execution chunk size (e.g. splitting a 1M row aggregate into batches
> > > > of 64K rows) and then spawns future tasks that push these chunks
> > > > through the child output exec node (by calling InputReceived)
> > > >
> > > > * In hash joins, the ExecNode accumulates batches to be inserted into
> > > > the hash table (the "probed" input), until the probed input is
> > > > exhausted, and then start asynchronously spawning tasks to probe the
> > > > completed hash table and passing the probed results into the child
> > > > output node
> > > >
> > > > I would suggest that we consider a different design that decouples
> > > > task control flow from the ExecNode implementation. The purpose would
> > > > be to give the user of the C++ engine more control over task
> > > > scheduling (including the order of execution) and prioritization.
> > > >
> > > > One system that does things different from the Arrow C++ Engine is
> > > > Meta's Velox project, whose operators work like this (slightly
> > > > simplified and colored by my own imperfect understanding):
> > > >
> > > > * The Driver class (which is associated with a single thread) is
> > > > responsible for execution control flow. A driver moves input batches
> > > > through an operator pipeline.
> > > >
> > > > * The Driver calls the Operator::addInput function with an input
> > > > batch. Operators are blocking vs. non-blocking based on whether the
> > > > Operator::needsMoreInput() function returns true. Simple operators
> > > > like Project can produce their output immediately by calling
> > > > Operator::getOutput
> > > >
> > > > * When the Driver hits a blocking operator in a pipeline, it returns
> > > > control to the calling thread so the thread can switch to doing work
> > > > for a different driver
> > > >
> > > > * One artifact of this design is that hash joins are split into a
> > > > HashBuild operator and a HashProbe operator so that the build and
> > > > probe stages of the hash join can be scheduled and executed more
> > > > precisely (for example: work for the pipeline that feeds the build
> > > > operator can be prioritized over the pipeline feeding the other input
> > > > to the probe).
> > > >
> > > > The idea in refactoring the Arrow C++ Engine would be instead of
> > > > having a tree of ExecNodes, each of which has its own internal
> control
> > > > flow (including the ability to spawn downstream tasks), instead
> > > > pipelinable operators can be grouped into PipelineExecutors (which
> > > > correspond roughly to Velox's Driver concept) which are responsible
> > > > for control flow and invoking the ExecNodes in sequence. This would
> > > > make it much easier for users to customize the control flow for
> > > > particular needs (for example, the recent discussion of adding time
> > > > series joins to the C++ engine means that the current eager-push /
> > > > "local" control flow can create problematic input ordering problems).
> > > > I think this might make the codebase easier to understand and test
> > > > also (and profile / trace, maybe, too), but that is just conjecture.
> > > >
> > > > As a separate matter, the C++ Engine does not have a separation
> > > > between input batches (what are called "morsels" in the HyPer paper)
> > > > and pipeline tasks (smaller cache-friendly units to move through the
> > > > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > > > stealing within pipelines (this concept is discussed in [1]).
> > > >
> > > > Hopefully the above makes sense and I look forward to others'
> thoughts.
> > > >
> > > > Thanks,
> > > > Wes
> > > >
> > > > [1]:
> https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
> >
>

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Li Jin <ic...@gmail.com>.
Thanks Wes and Michal.

We have similar concern about the current eager-push control flow with time
series / ordered data processing and am glad that we are not the only one
thinking about this.

I have read the doc and so far just left some questions to make sure I
understand the proposal (admittedly the generator concept is somewhat new
to me) and also thinking about it in the context of streaming ordered data
processing.

Excited to see where this goes,
Li

On Wed, May 11, 2022 at 6:43 PM Wes McKinney <we...@gmail.com> wrote:

> I talked about these problems with my colleague Michal Nowakiewicz who
> has been developing some of the C++ engine implementation over the
> last year and a half, and he wrote up this document with some ideas
> about task scheduling and control flow in the query engine for
> everyone to look at and comment:
>
>
> https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
>
> Feedback also welcome from the Rust developers to compare/contrast
> with how DataFusion works
>
> On Tue, May 3, 2022 at 1:05 AM Weston Pace <we...@gmail.com> wrote:
> >
> > Thanks for investigating and looking through this.  Your understanding
> > of how things work is pretty much spot on.  In addition, I think the
> > points you are making are valid.  Our ExecNode/ExecPlan interfaces are
> > extremely bare bones and similar nodes have had to reimplement the
> > same solutions (e.g. many nodes are using things like AtomicCounter,
> > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> > most significant short term impact of cleaning this up would be to
> > avoid things like the race condition in [1] which happened because one
> > node was doing things in a slightly older way.  If anyone is
> > particularly interested in tackling this problem I'd be happy to go
> > into more details.
> >
> > However, I think you are slightly overselling the potential benefits.
> > I don't think this would make it easier to adopt morsel/batch,
> > implement asymmetric backpressure, better scheduling, work stealing,
> > or sequencing (all of which I agree are good ideas with the exception
> > of work stealing which I don't think we would significantly benefit
> > from).  What's more, we don't have very many nodes today and I think
> > there is a risk of over-learning from this small sample size.  For
> > example, this sequencing discussion is very interesting.  I think an
> > asof join node is not a pipeline breaker, but it also does not fit the
> > mold of a standard pipeline node.  It has multiple inputs and there is
> > not a clear 1:1 mapping between input and output batches.  I don't
> > know the Velox driver model well enough to comment on it specifically
> > but if you were to put this node in the middle of a pipeline you might
> > end up generating empty batches, too-large batches, or not enough
> > thread tasks to saturate the cores.  If you were to put it between
> > pipeline drivers you would potentially lose cache locality.
> >
> > Regarding morsel/batch.  The main thing really preventing us from
> > moving to this model is the overhead cost of running small batches.
> > This is due to things like the problem you described in [2] and
> > somewhat demonstrated by benchmarks like [3].  As a result, as soon as
> > we shrink the batch size small enough to fit into L2, we start to see
> > overhead increase to eliminate the benefits we get from better cache
> > utilization (not just CPU overhead but also thread contention).
> > Unfortunately, some of the fixes here could possibly involve changes
> > to ExecBatch & Datum, which are used extensively in the kernel
> > infrastructure.  From my profiling, this underutilization of cache is
> > one of the most significant performance issues we have today.
> >
> > [1] https://github.com/apache/arrow/pull/12894
> > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > [3] https://github.com/apache/arrow/pull/12755
> > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <we...@gmail.com> wrote:
> > >
> > > hi all,
> > >
> > > I've been catching up on the C++ execution engine codebase after a
> > > fairly long development hiatus.
> > >
> > > I have several questions / comments about the current design of the
> > > ExecNode and their implementations (currently: source / scan, filter,
> > > project, union, aggregate, sink, hash join).
> > >
> > > My current understanding of how things work is the following:
> > >
> > > * Scan/Source nodes initiate execution through the StartProducing()
> > > function, which spawns an asynchronous generator that yields a
> > > sequence of input data batches. When each batch is available, it is
> > > passed to child operators by calling their InputReceived methods
> > >
> > > * When InputReceived is called
> > >     * For non-blocking operators (e.g. Filter, Project), the unit of
> > > work is performed immediately and the result is passed to the child
> > > operator by calling its InputReceived method
> > >     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> > > results are accumulated until the operator can begin producing output
> > > (all input for aggregation, or until the HT has been built for the
> > > HashJoin)
> > >
> > > * When an error occurs, a signal to abort will be propagated up and
> > > down the execution tree
> > >
> > > * Eventually output lands in a Sink node, which is the desired result
> > >
> > > One concern I have about the current structure is the way in which
> > > ExecNode implementations are responsible for downstream control flow,
> > > and the extent to which operator pipelining (the same thread advancing
> > > input-output chains until reaching a pipeline breaker) is implicit
> > > versus explicit. To give a couple examples:
> > >
> > > * In hash aggregations (GroupByNode), when the input has been
> > > exhausted, the GroupByNode splits the result into the desired
> > > execution chunk size (e.g. splitting a 1M row aggregate into batches
> > > of 64K rows) and then spawns future tasks that push these chunks
> > > through the child output exec node (by calling InputReceived)
> > >
> > > * In hash joins, the ExecNode accumulates batches to be inserted into
> > > the hash table (the "probed" input), until the probed input is
> > > exhausted, and then start asynchronously spawning tasks to probe the
> > > completed hash table and passing the probed results into the child
> > > output node
> > >
> > > I would suggest that we consider a different design that decouples
> > > task control flow from the ExecNode implementation. The purpose would
> > > be to give the user of the C++ engine more control over task
> > > scheduling (including the order of execution) and prioritization.
> > >
> > > One system that does things different from the Arrow C++ Engine is
> > > Meta's Velox project, whose operators work like this (slightly
> > > simplified and colored by my own imperfect understanding):
> > >
> > > * The Driver class (which is associated with a single thread) is
> > > responsible for execution control flow. A driver moves input batches
> > > through an operator pipeline.
> > >
> > > * The Driver calls the Operator::addInput function with an input
> > > batch. Operators are blocking vs. non-blocking based on whether the
> > > Operator::needsMoreInput() function returns true. Simple operators
> > > like Project can produce their output immediately by calling
> > > Operator::getOutput
> > >
> > > * When the Driver hits a blocking operator in a pipeline, it returns
> > > control to the calling thread so the thread can switch to doing work
> > > for a different driver
> > >
> > > * One artifact of this design is that hash joins are split into a
> > > HashBuild operator and a HashProbe operator so that the build and
> > > probe stages of the hash join can be scheduled and executed more
> > > precisely (for example: work for the pipeline that feeds the build
> > > operator can be prioritized over the pipeline feeding the other input
> > > to the probe).
> > >
> > > The idea in refactoring the Arrow C++ Engine would be instead of
> > > having a tree of ExecNodes, each of which has its own internal control
> > > flow (including the ability to spawn downstream tasks), instead
> > > pipelinable operators can be grouped into PipelineExecutors (which
> > > correspond roughly to Velox's Driver concept) which are responsible
> > > for control flow and invoking the ExecNodes in sequence. This would
> > > make it much easier for users to customize the control flow for
> > > particular needs (for example, the recent discussion of adding time
> > > series joins to the C++ engine means that the current eager-push /
> > > "local" control flow can create problematic input ordering problems).
> > > I think this might make the codebase easier to understand and test
> > > also (and profile / trace, maybe, too), but that is just conjecture.
> > >
> > > As a separate matter, the C++ Engine does not have a separation
> > > between input batches (what are called "morsels" in the HyPer paper)
> > > and pipeline tasks (smaller cache-friendly units to move through the
> > > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > > stealing within pipelines (this concept is discussed in [1]).
> > >
> > > Hopefully the above makes sense and I look forward to others' thoughts.
> > >
> > > Thanks,
> > > Wes
> > >
> > > [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
>

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Wes McKinney <we...@gmail.com>.
I talked about these problems with my colleague Michal Nowakiewicz who
has been developing some of the C++ engine implementation over the
last year and a half, and he wrote up this document with some ideas
about task scheduling and control flow in the query engine for
everyone to look at and comment:

https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#

Feedback also welcome from the Rust developers to compare/contrast
with how DataFusion works

On Tue, May 3, 2022 at 1:05 AM Weston Pace <we...@gmail.com> wrote:
>
> Thanks for investigating and looking through this.  Your understanding
> of how things work is pretty much spot on.  In addition, I think the
> points you are making are valid.  Our ExecNode/ExecPlan interfaces are
> extremely bare bones and similar nodes have had to reimplement the
> same solutions (e.g. many nodes are using things like AtomicCounter,
> ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> most significant short term impact of cleaning this up would be to
> avoid things like the race condition in [1] which happened because one
> node was doing things in a slightly older way.  If anyone is
> particularly interested in tackling this problem I'd be happy to go
> into more details.
>
> However, I think you are slightly overselling the potential benefits.
> I don't think this would make it easier to adopt morsel/batch,
> implement asymmetric backpressure, better scheduling, work stealing,
> or sequencing (all of which I agree are good ideas with the exception
> of work stealing which I don't think we would significantly benefit
> from).  What's more, we don't have very many nodes today and I think
> there is a risk of over-learning from this small sample size.  For
> example, this sequencing discussion is very interesting.  I think an
> asof join node is not a pipeline breaker, but it also does not fit the
> mold of a standard pipeline node.  It has multiple inputs and there is
> not a clear 1:1 mapping between input and output batches.  I don't
> know the Velox driver model well enough to comment on it specifically
> but if you were to put this node in the middle of a pipeline you might
> end up generating empty batches, too-large batches, or not enough
> thread tasks to saturate the cores.  If you were to put it between
> pipeline drivers you would potentially lose cache locality.
>
> Regarding morsel/batch.  The main thing really preventing us from
> moving to this model is the overhead cost of running small batches.
> This is due to things like the problem you described in [2] and
> somewhat demonstrated by benchmarks like [3].  As a result, as soon as
> we shrink the batch size small enough to fit into L2, we start to see
> overhead increase to eliminate the benefits we get from better cache
> utilization (not just CPU overhead but also thread contention).
> Unfortunately, some of the fixes here could possibly involve changes
> to ExecBatch & Datum, which are used extensively in the kernel
> infrastructure.  From my profiling, this underutilization of cache is
> one of the most significant performance issues we have today.
>
> [1] https://github.com/apache/arrow/pull/12894
> [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> [3] https://github.com/apache/arrow/pull/12755
> On Mon, May 2, 2022 at 1:20 PM Wes McKinney <we...@gmail.com> wrote:
> >
> > hi all,
> >
> > I've been catching up on the C++ execution engine codebase after a
> > fairly long development hiatus.
> >
> > I have several questions / comments about the current design of the
> > ExecNode and their implementations (currently: source / scan, filter,
> > project, union, aggregate, sink, hash join).
> >
> > My current understanding of how things work is the following:
> >
> > * Scan/Source nodes initiate execution through the StartProducing()
> > function, which spawns an asynchronous generator that yields a
> > sequence of input data batches. When each batch is available, it is
> > passed to child operators by calling their InputReceived methods
> >
> > * When InputReceived is called
> >     * For non-blocking operators (e.g. Filter, Project), the unit of
> > work is performed immediately and the result is passed to the child
> > operator by calling its InputReceived method
> >     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> > results are accumulated until the operator can begin producing output
> > (all input for aggregation, or until the HT has been built for the
> > HashJoin)
> >
> > * When an error occurs, a signal to abort will be propagated up and
> > down the execution tree
> >
> > * Eventually output lands in a Sink node, which is the desired result
> >
> > One concern I have about the current structure is the way in which
> > ExecNode implementations are responsible for downstream control flow,
> > and the extent to which operator pipelining (the same thread advancing
> > input-output chains until reaching a pipeline breaker) is implicit
> > versus explicit. To give a couple examples:
> >
> > * In hash aggregations (GroupByNode), when the input has been
> > exhausted, the GroupByNode splits the result into the desired
> > execution chunk size (e.g. splitting a 1M row aggregate into batches
> > of 64K rows) and then spawns future tasks that push these chunks
> > through the child output exec node (by calling InputReceived)
> >
> > * In hash joins, the ExecNode accumulates batches to be inserted into
> > the hash table (the "probed" input), until the probed input is
> > exhausted, and then start asynchronously spawning tasks to probe the
> > completed hash table and passing the probed results into the child
> > output node
> >
> > I would suggest that we consider a different design that decouples
> > task control flow from the ExecNode implementation. The purpose would
> > be to give the user of the C++ engine more control over task
> > scheduling (including the order of execution) and prioritization.
> >
> > One system that does things different from the Arrow C++ Engine is
> > Meta's Velox project, whose operators work like this (slightly
> > simplified and colored by my own imperfect understanding):
> >
> > * The Driver class (which is associated with a single thread) is
> > responsible for execution control flow. A driver moves input batches
> > through an operator pipeline.
> >
> > * The Driver calls the Operator::addInput function with an input
> > batch. Operators are blocking vs. non-blocking based on whether the
> > Operator::needsMoreInput() function returns true. Simple operators
> > like Project can produce their output immediately by calling
> > Operator::getOutput
> >
> > * When the Driver hits a blocking operator in a pipeline, it returns
> > control to the calling thread so the thread can switch to doing work
> > for a different driver
> >
> > * One artifact of this design is that hash joins are split into a
> > HashBuild operator and a HashProbe operator so that the build and
> > probe stages of the hash join can be scheduled and executed more
> > precisely (for example: work for the pipeline that feeds the build
> > operator can be prioritized over the pipeline feeding the other input
> > to the probe).
> >
> > The idea in refactoring the Arrow C++ Engine would be instead of
> > having a tree of ExecNodes, each of which has its own internal control
> > flow (including the ability to spawn downstream tasks), instead
> > pipelinable operators can be grouped into PipelineExecutors (which
> > correspond roughly to Velox's Driver concept) which are responsible
> > for control flow and invoking the ExecNodes in sequence. This would
> > make it much easier for users to customize the control flow for
> > particular needs (for example, the recent discussion of adding time
> > series joins to the C++ engine means that the current eager-push /
> > "local" control flow can create problematic input ordering problems).
> > I think this might make the codebase easier to understand and test
> > also (and profile / trace, maybe, too), but that is just conjecture.
> >
> > As a separate matter, the C++ Engine does not have a separation
> > between input batches (what are called "morsels" in the HyPer paper)
> > and pipeline tasks (smaller cache-friendly units to move through the
> > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > stealing within pipelines (this concept is discussed in [1]).
> >
> > Hopefully the above makes sense and I look forward to others' thoughts.
> >
> > Thanks,
> > Wes
> >
> > [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf

Re: [C++] Control flow and scheduling in C++ Engine operators / exec nodes

Posted by Weston Pace <we...@gmail.com>.
Thanks for investigating and looking through this.  Your understanding
of how things work is pretty much spot on.  In addition, I think the
points you are making are valid.  Our ExecNode/ExecPlan interfaces are
extremely bare bones and similar nodes have had to reimplement the
same solutions (e.g. many nodes are using things like AtomicCounter,
ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
most significant short term impact of cleaning this up would be to
avoid things like the race condition in [1] which happened because one
node was doing things in a slightly older way.  If anyone is
particularly interested in tackling this problem I'd be happy to go
into more details.

However, I think you are slightly overselling the potential benefits.
I don't think this would make it easier to adopt morsel/batch,
implement asymmetric backpressure, better scheduling, work stealing,
or sequencing (all of which I agree are good ideas with the exception
of work stealing which I don't think we would significantly benefit
from).  What's more, we don't have very many nodes today and I think
there is a risk of over-learning from this small sample size.  For
example, this sequencing discussion is very interesting.  I think an
asof join node is not a pipeline breaker, but it also does not fit the
mold of a standard pipeline node.  It has multiple inputs and there is
not a clear 1:1 mapping between input and output batches.  I don't
know the Velox driver model well enough to comment on it specifically
but if you were to put this node in the middle of a pipeline you might
end up generating empty batches, too-large batches, or not enough
thread tasks to saturate the cores.  If you were to put it between
pipeline drivers you would potentially lose cache locality.

Regarding morsel/batch.  The main thing really preventing us from
moving to this model is the overhead cost of running small batches.
This is due to things like the problem you described in [2] and
somewhat demonstrated by benchmarks like [3].  As a result, as soon as
we shrink the batch size small enough to fit into L2, we start to see
overhead increase to eliminate the benefits we get from better cache
utilization (not just CPU overhead but also thread contention).
Unfortunately, some of the fixes here could possibly involve changes
to ExecBatch & Datum, which are used extensively in the kernel
infrastructure.  From my profiling, this underutilization of cache is
one of the most significant performance issues we have today.

[1] https://github.com/apache/arrow/pull/12894
[2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
[3] https://github.com/apache/arrow/pull/12755
On Mon, May 2, 2022 at 1:20 PM Wes McKinney <we...@gmail.com> wrote:
>
> hi all,
>
> I've been catching up on the C++ execution engine codebase after a
> fairly long development hiatus.
>
> I have several questions / comments about the current design of the
> ExecNode and their implementations (currently: source / scan, filter,
> project, union, aggregate, sink, hash join).
>
> My current understanding of how things work is the following:
>
> * Scan/Source nodes initiate execution through the StartProducing()
> function, which spawns an asynchronous generator that yields a
> sequence of input data batches. When each batch is available, it is
> passed to child operators by calling their InputReceived methods
>
> * When InputReceived is called
>     * For non-blocking operators (e.g. Filter, Project), the unit of
> work is performed immediately and the result is passed to the child
> operator by calling its InputReceived method
>     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> results are accumulated until the operator can begin producing output
> (all input for aggregation, or until the HT has been built for the
> HashJoin)
>
> * When an error occurs, a signal to abort will be propagated up and
> down the execution tree
>
> * Eventually output lands in a Sink node, which is the desired result
>
> One concern I have about the current structure is the way in which
> ExecNode implementations are responsible for downstream control flow,
> and the extent to which operator pipelining (the same thread advancing
> input-output chains until reaching a pipeline breaker) is implicit
> versus explicit. To give a couple examples:
>
> * In hash aggregations (GroupByNode), when the input has been
> exhausted, the GroupByNode splits the result into the desired
> execution chunk size (e.g. splitting a 1M row aggregate into batches
> of 64K rows) and then spawns future tasks that push these chunks
> through the child output exec node (by calling InputReceived)
>
> * In hash joins, the ExecNode accumulates batches to be inserted into
> the hash table (the "probed" input), until the probed input is
> exhausted, and then start asynchronously spawning tasks to probe the
> completed hash table and passing the probed results into the child
> output node
>
> I would suggest that we consider a different design that decouples
> task control flow from the ExecNode implementation. The purpose would
> be to give the user of the C++ engine more control over task
> scheduling (including the order of execution) and prioritization.
>
> One system that does things different from the Arrow C++ Engine is
> Meta's Velox project, whose operators work like this (slightly
> simplified and colored by my own imperfect understanding):
>
> * The Driver class (which is associated with a single thread) is
> responsible for execution control flow. A driver moves input batches
> through an operator pipeline.
>
> * The Driver calls the Operator::addInput function with an input
> batch. Operators are blocking vs. non-blocking based on whether the
> Operator::needsMoreInput() function returns true. Simple operators
> like Project can produce their output immediately by calling
> Operator::getOutput
>
> * When the Driver hits a blocking operator in a pipeline, it returns
> control to the calling thread so the thread can switch to doing work
> for a different driver
>
> * One artifact of this design is that hash joins are split into a
> HashBuild operator and a HashProbe operator so that the build and
> probe stages of the hash join can be scheduled and executed more
> precisely (for example: work for the pipeline that feeds the build
> operator can be prioritized over the pipeline feeding the other input
> to the probe).
>
> The idea in refactoring the Arrow C++ Engine would be instead of
> having a tree of ExecNodes, each of which has its own internal control
> flow (including the ability to spawn downstream tasks), instead
> pipelinable operators can be grouped into PipelineExecutors (which
> correspond roughly to Velox's Driver concept) which are responsible
> for control flow and invoking the ExecNodes in sequence. This would
> make it much easier for users to customize the control flow for
> particular needs (for example, the recent discussion of adding time
> series joins to the C++ engine means that the current eager-push /
> "local" control flow can create problematic input ordering problems).
> I think this might make the codebase easier to understand and test
> also (and profile / trace, maybe, too), but that is just conjecture.
>
> As a separate matter, the C++ Engine does not have a separation
> between input batches (what are called "morsels" in the HyPer paper)
> and pipeline tasks (smaller cache-friendly units to move through the
> pipeline), nor the ability (AFAICT) to do nested parallelism / work
> stealing within pipelines (this concept is discussed in [1]).
>
> Hopefully the above makes sense and I look forward to others' thoughts.
>
> Thanks,
> Wes
>
> [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf