You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Li Jin <ic...@gmail.com> on 2022/04/25 15:35:32 UTC

[Compute][C++] Question on compute scheduler

Hello!

I am reading the use of TaskScheduler inside C++ compute code (reading hash
join) and have some questions about it, in particular:

(1) What the purpose of SchedulerTaskCallback defined here:
https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
(My guess is that the caller of TaskScheduler::StartTaskGroup needs to
provide an implementation of a task executor, and the implementation of
SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
implementation)

(2) When would this task context not have an executor?
https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581

(3) What's the difference between TaskImpl and TaskGroupContinuationImpl in
TaskScheduler::RegisterTaskGroup? And how would one normally define
TaskGroupContinuationImpl?

Sorry I am still learning the Arrow compute internals and appreciate help
on understanding these.

Li

Re: [Compute][C++] Question on compute scheduler

Posted by Weston Pace <we...@gmail.com>.
Thanks Sasha, your intuition on the SerialExecutor is correct.  One of
the changes I am working on[1] will make it so that an executor is
always present.  The behavior when you do not have an executor is
rather strange (sometimes I/O threads are used and sometimes the
calling thread is used) and this leads to a lot of difficulty
understanding performance.  I plan to get back to this PR once 8.0.0
is finished but it will unfortunately require some changes to the
scanner.

I do not believe the TaskScheduler handles "do work when inputs are
ready otherwise try again later".  It was written originally with join
tasks in mind where we can't do any work until all of the input has
arrived.  So the pattern there is "queue everything in the node until
we are ready to process it all and then run the task scheduler".

Generally speaking, when a batch arrives, and you realize you have
enough data to do some work, you should just do it there.  This is how
the filter & project nodes operate.  They rely on the fact that we
will be processing many batches in parallel to get their parallelism
(e.g. no need for processing a single batch in parallel).  However, if
you've accumulated enough data that you want to process it in parallel
then you should create a task scheduler at that point.  For example,
if you have queued up batches 1,2,3, and 4 (out of 20 total batches)
and now you are ready to process them then you could create a
TaskScheduler.  You could then find yourself creating multiple task
schedulers throughout the run of a node.  The continuation would be
marking batches 1/2/3/4 done, sending out any necessary results and,
if that was the last batch group finished, marking the node finished.
This feels like a general pattern. We could probably build some
abstractions and common utilities around it if you find yourself up to
it.

Just to continue the example, let's pretend you can do work when you
get 4 batches.  So if you have 20 total batches there would be 5
"mega-tasks" worth of work.  Each "mega-task" might run in parallel
with other "mega-tasks".  Each mega-task would also, itself, spawn
smaller parallel sub-tasks.  Each mega-task would be responsible for
delivering its own output.  The last mega-task to finish is also
responsible for finishing the node.

[1] https://github.com/apache/arrow/pull/12468

On Mon, Apr 25, 2022 at 12:19 PM Sasha Krassovsky
<kr...@gmail.com> wrote:
>
> Hi Li,
> I’ll answer the questions in order:
>
> 1. Your guess is correct! The Hash Join may be used standalone (mostly in testing or benchmarking for now) or as part of the ExecNode. The ExecNode will pass the task to the Executor to be scheduled, or will run it immediately if it’s in sync mode (i.e. no executor). Our Hash Join benchmark uses OpenMP to schedule things, and passes a lambda that does OpenMP things to the HashJoin.
>
> 2. We might not have an executor if we want to execute synchronously. This is set during construction of the ExecContext, which is given to the ExecPlan during creation. If the ExecContext has a nullptr Executor, then we are in async mode, otherwise we use the Executor to schedule. One confusing thing is that we also have a SerialExecutor - I’m actually not quite sure what the difference between using that and setting the Executor to nullptr is (might have something to do with testing?). @Weston probably knows
>
> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is the function that implements the work that needs to be split up, TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl will receive the index of the task. If you’re familiar with OpenMP, it’s equivalent to this:
>
> #pragma omp parallel for
> for(int i = 0; i < 100; i++)
>     TaskImpl(omp_get_thread_num(), i);
> TaskGroupContinuationImpl();
>
> Examples of the two are here:
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>
> Sasha
>
> > On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
> >
> > Hello!
> >
> > I am reading the use of TaskScheduler inside C++ compute code (reading hash
> > join) and have some questions about it, in particular:
> >
> > (1) What the purpose of SchedulerTaskCallback defined here:
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> > (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> > provide an implementation of a task executor, and the implementation of
> > SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> > implementation)
> >
> > (2) When would this task context not have an executor?
> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >
> > (3) What's the difference between TaskImpl and TaskGroupContinuationImpl in
> > TaskScheduler::RegisterTaskGroup? And how would one normally define
> > TaskGroupContinuationImpl?
> >
> > Sorry I am still learning the Arrow compute internals and appreciate help
> > on understanding these.
> >
> > Li
>

Re: [Compute][C++] Question on compute scheduler

Posted by Li Jin <ic...@gmail.com>.
I see. Yeah, spill to disk seems to be a reasonable approach. Hard back
pressure does seem like it can lead to deadlocks.

On Wed, Apr 27, 2022 at 4:55 PM Weston Pace <we...@gmail.com> wrote:

> Our backpressure is best-effort. A push downstream will never
> fail/block. Eventually, when sinks (or pipeline breakers) start to
> fill up, a pause message is sent to the source nodes. However,
> anything in progress will continue and should not be prevented from
> completing and pushing results upwards.
>
> Adding spill-to-disk to the asof join would seem more applicable if
> the as-of join was queuing all data in memory.  We are starting to
> look at that for the hash-join for example.
>
>
> On Wed, Apr 27, 2022 at 8:25 AM Li Jin <ic...@gmail.com> wrote:
> >
> > Thanks both! The ExecPlan Sequencing doc is interesting and close to the
> > problem that we are trying to solve. (Ordered progressing)
> >
> > One thought is that I can see some cases for deadlock if we are not
> > careful, for example (Filter Node -> Asof Join Node, assuming Asof Join
> > node requires ordered input batches):
> >
> > (Sequence of event happening)
> >
> > (1)Filter Node has n threads, we got unlucky and batch index 0 is never
> > processed. T
> > (2) The n threads starts to process batches and send batches to
> downstream
> > node.
> > (3) Downstream node queues up the batches but cannot process any of them.
> > At some point, downstream node queue will be filled up (assuming we bound
> > the queued batches) and tell Filter node "I cannot take any more batches"
> > (Not sure if back pressuring like this exist now)
> > (4) Filter node has all its threads processing batches but because
> > downstream node cannot take any batches, those threads cannot make
> progress
> > either.
> > (5) No progress can be made on either node.
> >
> > Maybe the Asof Join node in this case needs an unbounded queue (spill to
> > disk), or the FilterNode needs to know that it needs to process batch 0
> and
> > stop processing other batches until the downstream node can start
> consuming.
> >
> > Thoughts?
> > Li
> >
> > On Tue, Apr 26, 2022 at 4:07 PM Weston Pace <we...@gmail.com>
> wrote:
> >
> > > There was an old design document I proposed on this ML a while back.
> > > I never got around to implementing it and I think it has aged somewhat
> > > but it covers some of the points I brought up and it might be worth
> > > reviewing.
> > >
> > >
> > >
> https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs/edit#heading=h.e54mys6bvhhe
> > >
> > > On Tue, Apr 26, 2022 at 10:05 AM Sasha Krassovsky
> > > <kr...@gmail.com> wrote:
> > > >
> > > > An ExecPlan is composed of a bunch of implicit “pipelines”. Each
> node in
> > > a pipeline (starting with a source node) implements `InputReceived` and
> > > `InputFinished`. On `InputReceived`, it performs its computation and
> calls
> > > `InputReceived` on its output. On `InputFinished`, it performs any
> cleanup
> > > and calls `InputFinished` on its output (note that in the code,
> `outputs_`
> > > is a vector, but we only ever use `outputs_[0]`. This will probably
> end up
> > > getting cleaned up at some point). As such there’s an implicit
> pipeline of
> > > chained calls to `InputReceived`. Some nodes, such as Join or GroupBy
> or
> > > Sort are pipeline breakers: they must accumulate the whole dataset
> before
> > > performing their computation and starting off the next pipeline.
> Pipeline
> > > breakers would make use of stuff like TaskGroup and such.
> > > >
> > > > So the model of parallelism is driven by the source nodes: if your
> > > source node is multithreaded, then you may have several concurrent
> calls to
> > > `InputReceived`. Weston mentioned to me today that there may be a way
> to
> > > give some sort of guarantee of “almost-ordered” input, which may be
> enough
> > > to make streaming work well (you’d only have to accumulate at most
> > > `num_threads` extra batches in memory at a time). I’m not sure the
> details
> > > of it, but that may be possible.
> > > >
> > > > Hopefully the description of how parallelism works was at least
> helpful!
> > > >
> > > > Sasha
> > > >
> > > > > On Apr 26, 2022, at 12:54 PM, Li Jin <ic...@gmail.com>
> wrote:
> > > > >
> > > > > sure how they would output. (i.e., do they output batches / call
> > > >
> > >
>

Re: [Compute][C++] Question on compute scheduler

Posted by Weston Pace <we...@gmail.com>.
Our backpressure is best-effort. A push downstream will never
fail/block. Eventually, when sinks (or pipeline breakers) start to
fill up, a pause message is sent to the source nodes. However,
anything in progress will continue and should not be prevented from
completing and pushing results upwards.

Adding spill-to-disk to the asof join would seem more applicable if
the as-of join was queuing all data in memory.  We are starting to
look at that for the hash-join for example.


On Wed, Apr 27, 2022 at 8:25 AM Li Jin <ic...@gmail.com> wrote:
>
> Thanks both! The ExecPlan Sequencing doc is interesting and close to the
> problem that we are trying to solve. (Ordered progressing)
>
> One thought is that I can see some cases for deadlock if we are not
> careful, for example (Filter Node -> Asof Join Node, assuming Asof Join
> node requires ordered input batches):
>
> (Sequence of event happening)
>
> (1)Filter Node has n threads, we got unlucky and batch index 0 is never
> processed. T
> (2) The n threads starts to process batches and send batches to downstream
> node.
> (3) Downstream node queues up the batches but cannot process any of them.
> At some point, downstream node queue will be filled up (assuming we bound
> the queued batches) and tell Filter node "I cannot take any more batches"
> (Not sure if back pressuring like this exist now)
> (4) Filter node has all its threads processing batches but because
> downstream node cannot take any batches, those threads cannot make progress
> either.
> (5) No progress can be made on either node.
>
> Maybe the Asof Join node in this case needs an unbounded queue (spill to
> disk), or the FilterNode needs to know that it needs to process batch 0 and
> stop processing other batches until the downstream node can start consuming.
>
> Thoughts?
> Li
>
> On Tue, Apr 26, 2022 at 4:07 PM Weston Pace <we...@gmail.com> wrote:
>
> > There was an old design document I proposed on this ML a while back.
> > I never got around to implementing it and I think it has aged somewhat
> > but it covers some of the points I brought up and it might be worth
> > reviewing.
> >
> >
> > https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs/edit#heading=h.e54mys6bvhhe
> >
> > On Tue, Apr 26, 2022 at 10:05 AM Sasha Krassovsky
> > <kr...@gmail.com> wrote:
> > >
> > > An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in
> > a pipeline (starting with a source node) implements `InputReceived` and
> > `InputFinished`. On `InputReceived`, it performs its computation and calls
> > `InputReceived` on its output. On `InputFinished`, it performs any cleanup
> > and calls `InputFinished` on its output (note that in the code, `outputs_`
> > is a vector, but we only ever use `outputs_[0]`. This will probably end up
> > getting cleaned up at some point). As such there’s an implicit pipeline of
> > chained calls to `InputReceived`. Some nodes, such as Join or GroupBy or
> > Sort are pipeline breakers: they must accumulate the whole dataset before
> > performing their computation and starting off the next pipeline. Pipeline
> > breakers would make use of stuff like TaskGroup and such.
> > >
> > > So the model of parallelism is driven by the source nodes: if your
> > source node is multithreaded, then you may have several concurrent calls to
> > `InputReceived`. Weston mentioned to me today that there may be a way to
> > give some sort of guarantee of “almost-ordered” input, which may be enough
> > to make streaming work well (you’d only have to accumulate at most
> > `num_threads` extra batches in memory at a time). I’m not sure the details
> > of it, but that may be possible.
> > >
> > > Hopefully the description of how parallelism works was at least helpful!
> > >
> > > Sasha
> > >
> > > > On Apr 26, 2022, at 12:54 PM, Li Jin <ic...@gmail.com> wrote:
> > > >
> > > > sure how they would output. (i.e., do they output batches / call
> > >
> >

Re: [Compute][C++] Question on compute scheduler

Posted by Li Jin <ic...@gmail.com>.
Thanks both! The ExecPlan Sequencing doc is interesting and close to the
problem that we are trying to solve. (Ordered progressing)

One thought is that I can see some cases for deadlock if we are not
careful, for example (Filter Node -> Asof Join Node, assuming Asof Join
node requires ordered input batches):

(Sequence of event happening)

(1)Filter Node has n threads, we got unlucky and batch index 0 is never
processed. T
(2) The n threads starts to process batches and send batches to downstream
node.
(3) Downstream node queues up the batches but cannot process any of them.
At some point, downstream node queue will be filled up (assuming we bound
the queued batches) and tell Filter node "I cannot take any more batches"
(Not sure if back pressuring like this exist now)
(4) Filter node has all its threads processing batches but because
downstream node cannot take any batches, those threads cannot make progress
either.
(5) No progress can be made on either node.

Maybe the Asof Join node in this case needs an unbounded queue (spill to
disk), or the FilterNode needs to know that it needs to process batch 0 and
stop processing other batches until the downstream node can start consuming.

Thoughts?
Li

On Tue, Apr 26, 2022 at 4:07 PM Weston Pace <we...@gmail.com> wrote:

> There was an old design document I proposed on this ML a while back.
> I never got around to implementing it and I think it has aged somewhat
> but it covers some of the points I brought up and it might be worth
> reviewing.
>
>
> https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs/edit#heading=h.e54mys6bvhhe
>
> On Tue, Apr 26, 2022 at 10:05 AM Sasha Krassovsky
> <kr...@gmail.com> wrote:
> >
> > An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in
> a pipeline (starting with a source node) implements `InputReceived` and
> `InputFinished`. On `InputReceived`, it performs its computation and calls
> `InputReceived` on its output. On `InputFinished`, it performs any cleanup
> and calls `InputFinished` on its output (note that in the code, `outputs_`
> is a vector, but we only ever use `outputs_[0]`. This will probably end up
> getting cleaned up at some point). As such there’s an implicit pipeline of
> chained calls to `InputReceived`. Some nodes, such as Join or GroupBy or
> Sort are pipeline breakers: they must accumulate the whole dataset before
> performing their computation and starting off the next pipeline. Pipeline
> breakers would make use of stuff like TaskGroup and such.
> >
> > So the model of parallelism is driven by the source nodes: if your
> source node is multithreaded, then you may have several concurrent calls to
> `InputReceived`. Weston mentioned to me today that there may be a way to
> give some sort of guarantee of “almost-ordered” input, which may be enough
> to make streaming work well (you’d only have to accumulate at most
> `num_threads` extra batches in memory at a time). I’m not sure the details
> of it, but that may be possible.
> >
> > Hopefully the description of how parallelism works was at least helpful!
> >
> > Sasha
> >
> > > On Apr 26, 2022, at 12:54 PM, Li Jin <ic...@gmail.com> wrote:
> > >
> > > sure how they would output. (i.e., do they output batches / call
> >
>

Re: [Compute][C++] Question on compute scheduler

Posted by Weston Pace <we...@gmail.com>.
There was an old design document I proposed on this ML a while back.
I never got around to implementing it and I think it has aged somewhat
but it covers some of the points I brought up and it might be worth
reviewing.

https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs/edit#heading=h.e54mys6bvhhe

On Tue, Apr 26, 2022 at 10:05 AM Sasha Krassovsky
<kr...@gmail.com> wrote:
>
> An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in a pipeline (starting with a source node) implements `InputReceived` and `InputFinished`. On `InputReceived`, it performs its computation and calls `InputReceived` on its output. On `InputFinished`, it performs any cleanup and calls `InputFinished` on its output (note that in the code, `outputs_` is a vector, but we only ever use `outputs_[0]`. This will probably end up getting cleaned up at some point). As such there’s an implicit pipeline of chained calls to `InputReceived`. Some nodes, such as Join or GroupBy or Sort are pipeline breakers: they must accumulate the whole dataset before performing their computation and starting off the next pipeline. Pipeline breakers would make use of stuff like TaskGroup and such.
>
> So the model of parallelism is driven by the source nodes: if your source node is multithreaded, then you may have several concurrent calls to `InputReceived`. Weston mentioned to me today that there may be a way to give some sort of guarantee of “almost-ordered” input, which may be enough to make streaming work well (you’d only have to accumulate at most `num_threads` extra batches in memory at a time). I’m not sure the details of it, but that may be possible.
>
> Hopefully the description of how parallelism works was at least helpful!
>
> Sasha
>
> > On Apr 26, 2022, at 12:54 PM, Li Jin <ic...@gmail.com> wrote:
> >
> > sure how they would output. (i.e., do they output batches / call
>

Re: [Compute][C++] Question on compute scheduler

Posted by Sasha Krassovsky <kr...@gmail.com>.
An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in a pipeline (starting with a source node) implements `InputReceived` and `InputFinished`. On `InputReceived`, it performs its computation and calls `InputReceived` on its output. On `InputFinished`, it performs any cleanup and calls `InputFinished` on its output (note that in the code, `outputs_` is a vector, but we only ever use `outputs_[0]`. This will probably end up getting cleaned up at some point). As such there’s an implicit pipeline of chained calls to `InputReceived`. Some nodes, such as Join or GroupBy or Sort are pipeline breakers: they must accumulate the whole dataset before performing their computation and starting off the next pipeline. Pipeline breakers would make use of stuff like TaskGroup and such. 

So the model of parallelism is driven by the source nodes: if your source node is multithreaded, then you may have several concurrent calls to `InputReceived`. Weston mentioned to me today that there may be a way to give some sort of guarantee of “almost-ordered” input, which may be enough to make streaming work well (you’d only have to accumulate at most `num_threads` extra batches in memory at a time). I’m not sure the details of it, but that may be possible. 

Hopefully the description of how parallelism works was at least helpful!

Sasha

> On Apr 26, 2022, at 12:54 PM, Li Jin <ic...@gmail.com> wrote:
> 
> sure how they would output. (i.e., do they output batches / call


Re: [Compute][C++] Question on compute scheduler

Posted by Weston Pace <we...@gmail.com>.
I think this is doable.  I think we want to introduce the concept of a
batch index.  The scanner is then responsible for assigning a batch
index to each outgoing batch.  Some ExecNode's would reset or destroy
the batch index (for example, you cannot generally do an asof join
after a hash join unless you insert a sort node in between).  The sort
node can reinstate the batch index.

A OrderedExecNode would extend from ExecNode and use this batch index.
A simple implementation would be:

OrderedExecNode has two states, processing and idle.

* If a batch arrives and it is the next batch and the state is idle:
  Move state to processing and begin operating
  as many batches as possible until the batch
  queue does not contain the next batch
* If a batch arrives and it is not the next batch:
  Add the batch to the batch queue
* If a batch arrives and the state is not idle:
  Add the batch to the batch queue

The batch queue can be some kind of heap structure / priority queue
that will know whether the top item is "the next item".  "The next
item" is a well defined concept as long as the batch index is
monotonically increasing (e.g. this could introduce some complication
if the asof join is after a filter join which eliminates an entire
batch but we can always emit an empty batch).

I call the above a simple implementation because it runs serially and
so this could become a choke point for a multithreaded engine.  A more
complex implementation would change "process the batch" to
"synchronously increment counters and update shared state and then
launch a task that can run in parallel".  For example, in an as-of
join I think you could do better to avoid a serial choke point.

On Tue, Apr 26, 2022 at 9:54 AM Li Jin <ic...@gmail.com> wrote:
>
> Hey thanks again for the reply!
>
> > I would suggest accumulating all batches just like in Hash Join
> This is something I intentionally try to avoid because asof join (and many
> other time series operations) can be performed in a streaming fashion to
> reduce memory footprint.
>
> > When you want to scale up to multiple threads, you will no longer be able
> to rely on any order because scheduling is generally pretty nondeterministic
> I think this depends on the implementation. If all the nodes receive and
> output ordered batches, inside each node they can still use multiple
> threads to compute the result. Extra work needs to be done to ensure the
> ordering of output data but I don't think it is infeasible to
> multi-threading a streaming algorithm.
>
> Maybe a broader questions is "What is a good way to implement a ExecNode
> that requires ordered inputs and produces ordered output and how does that
> fit with the current parallelization model of Arrow compute"
>
> To be honest I am not very familiar with the parallelization model. I have
> the vague impression that each ExecNode has its own processing thread and
> data is sent asynchronously between nodes (i.e., each node will queue the
> input batches instead of blocking the upstream node thread). Also I have
> the impression that some nodes do parallelizations internally, but I am not
> sure how they would output. (i.e., do they output batches / call
> InputReceived downstream nodes with multi thread)?
>
>
>
>
> On Tue, Apr 26, 2022 at 2:22 PM Sasha Krassovsky <kr...@gmail.com>
> wrote:
>
> > I would advise against relying on any specific ordering of batches coming
> > in. When you want to scale up to multiple threads, you will no longer be
> > able to rely on any order because scheduling is generally pretty
> > nondeterministic. I would suggest accumulating all batches just like in
> > Hash Join, and when InputFinished is called, sort them. I’d even suggest
> > not relying on input batches being sorted within themselves, so you’ll have
> > to implement a mini “sort node” (if you don’t have some other fancier data
> > structure for this join). The accumulation itself shouldn’t be a
> > performance hit either because the threads that would be processing the
> > join will continue processing the inputs to the join, so the overall
> > throughput shouldn’t be affected.
> >
> > After the sorting, you can kick off a task group that will compute the
> > results. One thing you’ll have to experiment with is how many tasks to
> > start: one for each pair of batches, or one for each left-side batch, or
> > one for each right-side batch. If it’s the first, it may be useful to
> > extend the TaskGroup interface to allow for two-dimensional task groups (so
> > that it would be easier to start a task for each pair).
> >
> > Sasha
> >
> > > On Apr 26, 2022, at 11:03 AM, Li Jin <ic...@gmail.com> wrote:
> > >
> > >> In order to produce a output for a left batch, I would need to wait
> > until
> > > I received enough batches from the right tables to cover all potential
> > > matches (wait until I have seen right timestamps outside the matching
> > range)
> > > Add a bit more explanation, let's say the time range of the current left
> > > batch is (2020101, 20200201), in order to produce the join result for
> > this
> > > batch, I need to receive all data from the right tables from (-inf,
> > > 20200201) and I can know this once I have seen data from all right tables
> > > that have timestamp after 20200201 (since data arrives in time order).
> > But
> > > I do not need to receive all data in order to produce an output batch
> > > (unlike hash join).
> > >
> > > On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ic...@gmail.com> wrote:
> > >
> > >> Thanks both for the reply. To add a bit more context, I am trying to
> > >> implement an "asof join". Here I have one left table and n right table,
> > and
> > >> all batches arrive in time order.
> > >>
> > >> In order to produce a output for a left batch, I would need to wait
> > until
> > >> I received enough batches from the right tables to cover all potential
> > >> matches (wait until I have seen right timestamps outside the matching
> > range)
> > >>
> > >> From both replies it sounds like I should just do the check if I got
> > >> enough data in InputReceived function and do work there when I have
> > enough
> > >> data.
> > >>
> > >> However, one thing that I am not sure about is how does the
> > >> parallelization comes into play - it sounds like InputReceived could be
> > >> called by multiple thread of the same input node for different batches?
> > >> Currently I have just trying to get a baseline implementation that has
> > one
> > >> thread doing the join so if InputReceived is called by multiple thread I
> > >> might ended up blocking other threads unnecessarily.
> > >>
> > >> If I have a dedicate thread/executor that does the join and
> > InputReceived
> > >> just queue the batches and return immediately, I felt like it would be
> > more
> > >> efficient.
> > >>
> > >> Thoughts?
> > >>
> > >> Thanks,
> > >> Li
> > >>
> > >> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
> > >> krassovskysasha@gmail.com> wrote:
> > >>
> > >>> If I understand correctly, on InputReceived you’ll be accumulating
> > >>> batches until you have enough to compute the next output? In that
> > case, you
> > >>> have two options: you can either just immediately compute it using the
> > same
> > >>> thread, or call the schedule_callback directly (not using the
> > scheduler). I
> > >>> think your pseudocode is correct - since whether or not you can output
> > the
> > >>> next batch can only change on InputReceived, that’s the only spot you
> > need
> > >>> to check. I think an elaboration of your pseudocode could be something
> > like:
> > >>>
> > >>> Status InputReceived(Batch b)
> > >>>    lock(accum_lock);
> > >>>    accum.push_back(b);
> > >>>    if(enough_inputs)
> > >>>        vector<Batch> batches = std::move(accum);
> > >>>        unlock(accum_lock);
> > >>>        compute_next_output(batches);
> > >>>    return Status::OK();
> > >>>
> > >>> Sasha
> > >>>
> > >>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ic...@gmail.com> wrote:
> > >>>>
> > >>>> Thanks! That's super helpful.
> > >>>>
> > >>>> A follow up question on TaskScheduler - What's the correct way to
> > >>> define a
> > >>>> task that "do work if input batches are ready, otherwise try later"?
> > >>>>
> > >>>> Sth like
> > >>>>
> > >>>> Status try_process():
> > >>>> if enough_inputs_to _produce_next_output:
> > >>>> compute_and_produce_next_output();
> > >>>> return Status::OK()
> > >>>> else:
> > >>>> # Is this right?
> > >>>> # Exit and try later
> > >>>> return Status::OK();
> > >>>>
> > >>>> If I register this function with TaskScheduler, I think it only gets
> > run
> > >>>> once, so I think I might need to schedule the next task when inputs
> > are
> > >>> not
> > >>>> ready but I am not sure of the best way to do that. Any suggestions?
> > >>>>
> > >>>> Li
> > >>>>
> > >>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
> > >>> krassovskysasha@gmail.com <ma...@gmail.com>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Li,
> > >>>>> I’ll answer the questions in order:
> > >>>>>
> > >>>>> 1. Your guess is correct! The Hash Join may be used standalone
> > (mostly
> > >>> in
> > >>>>> testing or benchmarking for now) or as part of the ExecNode. The
> > >>> ExecNode
> > >>>>> will pass the task to the Executor to be scheduled, or will run it
> > >>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> > >>>>> benchmark uses OpenMP to schedule things, and passes a lambda that
> > does
> > >>>>> OpenMP things to the HashJoin.
> > >>>>>
> > >>>>> 2. We might not have an executor if we want to execute synchronously.
> > >>> This
> > >>>>> is set during construction of the ExecContext, which is given to the
> > >>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor,
> > >>> then
> > >>>>> we are in async mode, otherwise we use the Executor to schedule. One
> > >>>>> confusing thing is that we also have a SerialExecutor - I’m actually
> > >>> not
> > >>>>> quite sure what the difference between using that and setting the
> > >>> Executor
> > >>>>> to nullptr is (might have something to do with testing?). @Weston
> > >>> probably
> > >>>>> knows
> > >>>>>
> > >>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl
> > is
> > >>>>> the function that implements the work that needs to be split up,
> > >>>>> TaskGroupContinuationImpl is what gets run after the for loop.
> > TaskImpl
> > >>>>> will receive the index of the task. If you’re familiar with OpenMP,
> > >>> it’s
> > >>>>> equivalent to this:
> > >>>>>
> > >>>>> #pragma omp parallel for
> > >>>>> for(int i = 0; i < 100; i++)
> > >>>>> TaskImpl(omp_get_thread_num(), i);
> > >>>>> TaskGroupContinuationImpl();
> > >>>>>
> > >>>>> Examples of the two are here:
> > >>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> > >>>>> <
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> > >>> <
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> > >>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>> <
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>>>
> > >>>>> <
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>> <
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>>>
> > >>>>>>
> > >>>>>
> > >>>>> Sasha
> > >>>>>
> > >>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
> > >>>>>>
> > >>>>>> Hello!
> > >>>>>>
> > >>>>>> I am reading the use of TaskScheduler inside C++ compute code
> > (reading
> > >>>>> hash
> > >>>>>> join) and have some questions about it, in particular:
> > >>>>>>
> > >>>>>> (1) What the purpose of SchedulerTaskCallback defined here:
> > >>>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> > >>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs
> > to
> > >>>>>> provide an implementation of a task executor, and the implementation
> > >>> of
> > >>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> > >>>>>> implementation)
> > >>>>>>
> > >>>>>> (2) When would this task context not have an executor?
> > >>>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> > >>>>>>
> > >>>>>> (3) What's the difference between TaskImpl and
> > >>> TaskGroupContinuationImpl
> > >>>>> in
> > >>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
> > >>>>>> TaskGroupContinuationImpl?
> > >>>>>>
> > >>>>>> Sorry I am still learning the Arrow compute internals and appreciate
> > >>> help
> > >>>>>> on understanding these.
> > >>>>>>
> > >>>>>> Li
> > >>>
> > >>>
> >
> >

Re: [Compute][C++] Question on compute scheduler

Posted by Li Jin <ic...@gmail.com>.
Hey thanks again for the reply!

> I would suggest accumulating all batches just like in Hash Join
This is something I intentionally try to avoid because asof join (and many
other time series operations) can be performed in a streaming fashion to
reduce memory footprint.

> When you want to scale up to multiple threads, you will no longer be able
to rely on any order because scheduling is generally pretty nondeterministic
I think this depends on the implementation. If all the nodes receive and
output ordered batches, inside each node they can still use multiple
threads to compute the result. Extra work needs to be done to ensure the
ordering of output data but I don't think it is infeasible to
multi-threading a streaming algorithm.

Maybe a broader questions is "What is a good way to implement a ExecNode
that requires ordered inputs and produces ordered output and how does that
fit with the current parallelization model of Arrow compute"

To be honest I am not very familiar with the parallelization model. I have
the vague impression that each ExecNode has its own processing thread and
data is sent asynchronously between nodes (i.e., each node will queue the
input batches instead of blocking the upstream node thread). Also I have
the impression that some nodes do parallelizations internally, but I am not
sure how they would output. (i.e., do they output batches / call
InputReceived downstream nodes with multi thread)?




On Tue, Apr 26, 2022 at 2:22 PM Sasha Krassovsky <kr...@gmail.com>
wrote:

> I would advise against relying on any specific ordering of batches coming
> in. When you want to scale up to multiple threads, you will no longer be
> able to rely on any order because scheduling is generally pretty
> nondeterministic. I would suggest accumulating all batches just like in
> Hash Join, and when InputFinished is called, sort them. I’d even suggest
> not relying on input batches being sorted within themselves, so you’ll have
> to implement a mini “sort node” (if you don’t have some other fancier data
> structure for this join). The accumulation itself shouldn’t be a
> performance hit either because the threads that would be processing the
> join will continue processing the inputs to the join, so the overall
> throughput shouldn’t be affected.
>
> After the sorting, you can kick off a task group that will compute the
> results. One thing you’ll have to experiment with is how many tasks to
> start: one for each pair of batches, or one for each left-side batch, or
> one for each right-side batch. If it’s the first, it may be useful to
> extend the TaskGroup interface to allow for two-dimensional task groups (so
> that it would be easier to start a task for each pair).
>
> Sasha
>
> > On Apr 26, 2022, at 11:03 AM, Li Jin <ic...@gmail.com> wrote:
> >
> >> In order to produce a output for a left batch, I would need to wait
> until
> > I received enough batches from the right tables to cover all potential
> > matches (wait until I have seen right timestamps outside the matching
> range)
> > Add a bit more explanation, let's say the time range of the current left
> > batch is (2020101, 20200201), in order to produce the join result for
> this
> > batch, I need to receive all data from the right tables from (-inf,
> > 20200201) and I can know this once I have seen data from all right tables
> > that have timestamp after 20200201 (since data arrives in time order).
> But
> > I do not need to receive all data in order to produce an output batch
> > (unlike hash join).
> >
> > On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ic...@gmail.com> wrote:
> >
> >> Thanks both for the reply. To add a bit more context, I am trying to
> >> implement an "asof join". Here I have one left table and n right table,
> and
> >> all batches arrive in time order.
> >>
> >> In order to produce a output for a left batch, I would need to wait
> until
> >> I received enough batches from the right tables to cover all potential
> >> matches (wait until I have seen right timestamps outside the matching
> range)
> >>
> >> From both replies it sounds like I should just do the check if I got
> >> enough data in InputReceived function and do work there when I have
> enough
> >> data.
> >>
> >> However, one thing that I am not sure about is how does the
> >> parallelization comes into play - it sounds like InputReceived could be
> >> called by multiple thread of the same input node for different batches?
> >> Currently I have just trying to get a baseline implementation that has
> one
> >> thread doing the join so if InputReceived is called by multiple thread I
> >> might ended up blocking other threads unnecessarily.
> >>
> >> If I have a dedicate thread/executor that does the join and
> InputReceived
> >> just queue the batches and return immediately, I felt like it would be
> more
> >> efficient.
> >>
> >> Thoughts?
> >>
> >> Thanks,
> >> Li
> >>
> >> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
> >> krassovskysasha@gmail.com> wrote:
> >>
> >>> If I understand correctly, on InputReceived you’ll be accumulating
> >>> batches until you have enough to compute the next output? In that
> case, you
> >>> have two options: you can either just immediately compute it using the
> same
> >>> thread, or call the schedule_callback directly (not using the
> scheduler). I
> >>> think your pseudocode is correct - since whether or not you can output
> the
> >>> next batch can only change on InputReceived, that’s the only spot you
> need
> >>> to check. I think an elaboration of your pseudocode could be something
> like:
> >>>
> >>> Status InputReceived(Batch b)
> >>>    lock(accum_lock);
> >>>    accum.push_back(b);
> >>>    if(enough_inputs)
> >>>        vector<Batch> batches = std::move(accum);
> >>>        unlock(accum_lock);
> >>>        compute_next_output(batches);
> >>>    return Status::OK();
> >>>
> >>> Sasha
> >>>
> >>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ic...@gmail.com> wrote:
> >>>>
> >>>> Thanks! That's super helpful.
> >>>>
> >>>> A follow up question on TaskScheduler - What's the correct way to
> >>> define a
> >>>> task that "do work if input batches are ready, otherwise try later"?
> >>>>
> >>>> Sth like
> >>>>
> >>>> Status try_process():
> >>>> if enough_inputs_to _produce_next_output:
> >>>> compute_and_produce_next_output();
> >>>> return Status::OK()
> >>>> else:
> >>>> # Is this right?
> >>>> # Exit and try later
> >>>> return Status::OK();
> >>>>
> >>>> If I register this function with TaskScheduler, I think it only gets
> run
> >>>> once, so I think I might need to schedule the next task when inputs
> are
> >>> not
> >>>> ready but I am not sure of the best way to do that. Any suggestions?
> >>>>
> >>>> Li
> >>>>
> >>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
> >>> krassovskysasha@gmail.com <ma...@gmail.com>>
> >>>> wrote:
> >>>>
> >>>>> Hi Li,
> >>>>> I’ll answer the questions in order:
> >>>>>
> >>>>> 1. Your guess is correct! The Hash Join may be used standalone
> (mostly
> >>> in
> >>>>> testing or benchmarking for now) or as part of the ExecNode. The
> >>> ExecNode
> >>>>> will pass the task to the Executor to be scheduled, or will run it
> >>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> >>>>> benchmark uses OpenMP to schedule things, and passes a lambda that
> does
> >>>>> OpenMP things to the HashJoin.
> >>>>>
> >>>>> 2. We might not have an executor if we want to execute synchronously.
> >>> This
> >>>>> is set during construction of the ExecContext, which is given to the
> >>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor,
> >>> then
> >>>>> we are in async mode, otherwise we use the Executor to schedule. One
> >>>>> confusing thing is that we also have a SerialExecutor - I’m actually
> >>> not
> >>>>> quite sure what the difference between using that and setting the
> >>> Executor
> >>>>> to nullptr is (might have something to do with testing?). @Weston
> >>> probably
> >>>>> knows
> >>>>>
> >>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl
> is
> >>>>> the function that implements the work that needs to be split up,
> >>>>> TaskGroupContinuationImpl is what gets run after the for loop.
> TaskImpl
> >>>>> will receive the index of the task. If you’re familiar with OpenMP,
> >>> it’s
> >>>>> equivalent to this:
> >>>>>
> >>>>> #pragma omp parallel for
> >>>>> for(int i = 0; i < 100; i++)
> >>>>> TaskImpl(omp_get_thread_num(), i);
> >>>>> TaskGroupContinuationImpl();
> >>>>>
> >>>>> Examples of the two are here:
> >>>>>
> >>>>>
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >>>>> <
> >>>>>
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >>> <
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >>> <
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >>>>
> >>>>> <
> >>>>>
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >>> <
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >>>>
> >>>>>>
> >>>>>
> >>>>> Sasha
> >>>>>
> >>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
> >>>>>>
> >>>>>> Hello!
> >>>>>>
> >>>>>> I am reading the use of TaskScheduler inside C++ compute code
> (reading
> >>>>> hash
> >>>>>> join) and have some questions about it, in particular:
> >>>>>>
> >>>>>> (1) What the purpose of SchedulerTaskCallback defined here:
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> >>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs
> to
> >>>>>> provide an implementation of a task executor, and the implementation
> >>> of
> >>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> >>>>>> implementation)
> >>>>>>
> >>>>>> (2) When would this task context not have an executor?
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >>>>>>
> >>>>>> (3) What's the difference between TaskImpl and
> >>> TaskGroupContinuationImpl
> >>>>> in
> >>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
> >>>>>> TaskGroupContinuationImpl?
> >>>>>>
> >>>>>> Sorry I am still learning the Arrow compute internals and appreciate
> >>> help
> >>>>>> on understanding these.
> >>>>>>
> >>>>>> Li
> >>>
> >>>
>
>

Re: [Compute][C++] Question on compute scheduler

Posted by Sasha Krassovsky <kr...@gmail.com>.
I would advise against relying on any specific ordering of batches coming in. When you want to scale up to multiple threads, you will no longer be able to rely on any order because scheduling is generally pretty nondeterministic. I would suggest accumulating all batches just like in Hash Join, and when InputFinished is called, sort them. I’d even suggest not relying on input batches being sorted within themselves, so you’ll have to implement a mini “sort node” (if you don’t have some other fancier data structure for this join). The accumulation itself shouldn’t be a performance hit either because the threads that would be processing the join will continue processing the inputs to the join, so the overall throughput shouldn’t be affected. 

After the sorting, you can kick off a task group that will compute the results. One thing you’ll have to experiment with is how many tasks to start: one for each pair of batches, or one for each left-side batch, or one for each right-side batch. If it’s the first, it may be useful to extend the TaskGroup interface to allow for two-dimensional task groups (so that it would be easier to start a task for each pair).  

Sasha

> On Apr 26, 2022, at 11:03 AM, Li Jin <ic...@gmail.com> wrote:
> 
>> In order to produce a output for a left batch, I would need to wait until
> I received enough batches from the right tables to cover all potential
> matches (wait until I have seen right timestamps outside the matching range)
> Add a bit more explanation, let's say the time range of the current left
> batch is (2020101, 20200201), in order to produce the join result for this
> batch, I need to receive all data from the right tables from (-inf,
> 20200201) and I can know this once I have seen data from all right tables
> that have timestamp after 20200201 (since data arrives in time order). But
> I do not need to receive all data in order to produce an output batch
> (unlike hash join).
> 
> On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ic...@gmail.com> wrote:
> 
>> Thanks both for the reply. To add a bit more context, I am trying to
>> implement an "asof join". Here I have one left table and n right table, and
>> all batches arrive in time order.
>> 
>> In order to produce a output for a left batch, I would need to wait until
>> I received enough batches from the right tables to cover all potential
>> matches (wait until I have seen right timestamps outside the matching range)
>> 
>> From both replies it sounds like I should just do the check if I got
>> enough data in InputReceived function and do work there when I have enough
>> data.
>> 
>> However, one thing that I am not sure about is how does the
>> parallelization comes into play - it sounds like InputReceived could be
>> called by multiple thread of the same input node for different batches?
>> Currently I have just trying to get a baseline implementation that has one
>> thread doing the join so if InputReceived is called by multiple thread I
>> might ended up blocking other threads unnecessarily.
>> 
>> If I have a dedicate thread/executor that does the join and InputReceived
>> just queue the batches and return immediately, I felt like it would be more
>> efficient.
>> 
>> Thoughts?
>> 
>> Thanks,
>> Li
>> 
>> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
>> krassovskysasha@gmail.com> wrote:
>> 
>>> If I understand correctly, on InputReceived you’ll be accumulating
>>> batches until you have enough to compute the next output? In that case, you
>>> have two options: you can either just immediately compute it using the same
>>> thread, or call the schedule_callback directly (not using the scheduler). I
>>> think your pseudocode is correct - since whether or not you can output the
>>> next batch can only change on InputReceived, that’s the only spot you need
>>> to check. I think an elaboration of your pseudocode could be something like:
>>> 
>>> Status InputReceived(Batch b)
>>>    lock(accum_lock);
>>>    accum.push_back(b);
>>>    if(enough_inputs)
>>>        vector<Batch> batches = std::move(accum);
>>>        unlock(accum_lock);
>>>        compute_next_output(batches);
>>>    return Status::OK();
>>> 
>>> Sasha
>>> 
>>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ic...@gmail.com> wrote:
>>>> 
>>>> Thanks! That's super helpful.
>>>> 
>>>> A follow up question on TaskScheduler - What's the correct way to
>>> define a
>>>> task that "do work if input batches are ready, otherwise try later"?
>>>> 
>>>> Sth like
>>>> 
>>>> Status try_process():
>>>> if enough_inputs_to _produce_next_output:
>>>> compute_and_produce_next_output();
>>>> return Status::OK()
>>>> else:
>>>> # Is this right?
>>>> # Exit and try later
>>>> return Status::OK();
>>>> 
>>>> If I register this function with TaskScheduler, I think it only gets run
>>>> once, so I think I might need to schedule the next task when inputs are
>>> not
>>>> ready but I am not sure of the best way to do that. Any suggestions?
>>>> 
>>>> Li
>>>> 
>>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
>>> krassovskysasha@gmail.com <ma...@gmail.com>>
>>>> wrote:
>>>> 
>>>>> Hi Li,
>>>>> I’ll answer the questions in order:
>>>>> 
>>>>> 1. Your guess is correct! The Hash Join may be used standalone (mostly
>>> in
>>>>> testing or benchmarking for now) or as part of the ExecNode. The
>>> ExecNode
>>>>> will pass the task to the Executor to be scheduled, or will run it
>>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>>>>> benchmark uses OpenMP to schedule things, and passes a lambda that does
>>>>> OpenMP things to the HashJoin.
>>>>> 
>>>>> 2. We might not have an executor if we want to execute synchronously.
>>> This
>>>>> is set during construction of the ExecContext, which is given to the
>>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor,
>>> then
>>>>> we are in async mode, otherwise we use the Executor to schedule. One
>>>>> confusing thing is that we also have a SerialExecutor - I’m actually
>>> not
>>>>> quite sure what the difference between using that and setting the
>>> Executor
>>>>> to nullptr is (might have something to do with testing?). @Weston
>>> probably
>>>>> knows
>>>>> 
>>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>>>>> the function that implements the work that needs to be split up,
>>>>> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>>>>> will receive the index of the task. If you’re familiar with OpenMP,
>>> it’s
>>>>> equivalent to this:
>>>>> 
>>>>> #pragma omp parallel for
>>>>> for(int i = 0; i < 100; i++)
>>>>> TaskImpl(omp_get_thread_num(), i);
>>>>> TaskGroupContinuationImpl();
>>>>> 
>>>>> Examples of the two are here:
>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>>>> <
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>> <
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>> <
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>>> 
>>>>> <
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>> <
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>>> 
>>>>>> 
>>>>> 
>>>>> Sasha
>>>>> 
>>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
>>>>>> 
>>>>>> Hello!
>>>>>> 
>>>>>> I am reading the use of TaskScheduler inside C++ compute code (reading
>>>>> hash
>>>>>> join) and have some questions about it, in particular:
>>>>>> 
>>>>>> (1) What the purpose of SchedulerTaskCallback defined here:
>>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>>>>>> provide an implementation of a task executor, and the implementation
>>> of
>>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>>>>>> implementation)
>>>>>> 
>>>>>> (2) When would this task context not have an executor?
>>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>>>>>> 
>>>>>> (3) What's the difference between TaskImpl and
>>> TaskGroupContinuationImpl
>>>>> in
>>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>>>>>> TaskGroupContinuationImpl?
>>>>>> 
>>>>>> Sorry I am still learning the Arrow compute internals and appreciate
>>> help
>>>>>> on understanding these.
>>>>>> 
>>>>>> Li
>>> 
>>> 


Re: [Compute][C++] Question on compute scheduler

Posted by Li Jin <ic...@gmail.com>.
> In order to produce a output for a left batch, I would need to wait until
I received enough batches from the right tables to cover all potential
matches (wait until I have seen right timestamps outside the matching range)
Add a bit more explanation, let's say the time range of the current left
batch is (2020101, 20200201), in order to produce the join result for this
batch, I need to receive all data from the right tables from (-inf,
20200201) and I can know this once I have seen data from all right tables
that have timestamp after 20200201 (since data arrives in time order). But
I do not need to receive all data in order to produce an output batch
(unlike hash join).

On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ic...@gmail.com> wrote:

> Thanks both for the reply. To add a bit more context, I am trying to
> implement an "asof join". Here I have one left table and n right table, and
> all batches arrive in time order.
>
> In order to produce a output for a left batch, I would need to wait until
> I received enough batches from the right tables to cover all potential
> matches (wait until I have seen right timestamps outside the matching range)
>
> From both replies it sounds like I should just do the check if I got
> enough data in InputReceived function and do work there when I have enough
> data.
>
> However, one thing that I am not sure about is how does the
> parallelization comes into play - it sounds like InputReceived could be
> called by multiple thread of the same input node for different batches?
> Currently I have just trying to get a baseline implementation that has one
> thread doing the join so if InputReceived is called by multiple thread I
> might ended up blocking other threads unnecessarily.
>
> If I have a dedicate thread/executor that does the join and InputReceived
> just queue the batches and return immediately, I felt like it would be more
> efficient.
>
> Thoughts?
>
> Thanks,
> Li
>
> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
> krassovskysasha@gmail.com> wrote:
>
>> If I understand correctly, on InputReceived you’ll be accumulating
>> batches until you have enough to compute the next output? In that case, you
>> have two options: you can either just immediately compute it using the same
>> thread, or call the schedule_callback directly (not using the scheduler). I
>> think your pseudocode is correct - since whether or not you can output the
>> next batch can only change on InputReceived, that’s the only spot you need
>> to check. I think an elaboration of your pseudocode could be something like:
>>
>> Status InputReceived(Batch b)
>>     lock(accum_lock);
>>     accum.push_back(b);
>>     if(enough_inputs)
>>         vector<Batch> batches = std::move(accum);
>>         unlock(accum_lock);
>>         compute_next_output(batches);
>>     return Status::OK();
>>
>> Sasha
>>
>> > On Apr 25, 2022, at 3:29 PM, Li Jin <ic...@gmail.com> wrote:
>> >
>> > Thanks! That's super helpful.
>> >
>> > A follow up question on TaskScheduler - What's the correct way to
>> define a
>> > task that "do work if input batches are ready, otherwise try later"?
>> >
>> > Sth like
>> >
>> > Status try_process():
>> > if enough_inputs_to _produce_next_output:
>> > compute_and_produce_next_output();
>> > return Status::OK()
>> > else:
>> > # Is this right?
>> > # Exit and try later
>> > return Status::OK();
>> >
>> > If I register this function with TaskScheduler, I think it only gets run
>> > once, so I think I might need to schedule the next task when inputs are
>> not
>> > ready but I am not sure of the best way to do that. Any suggestions?
>> >
>> > Li
>> >
>> > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
>> krassovskysasha@gmail.com <ma...@gmail.com>>
>> > wrote:
>> >
>> >> Hi Li,
>> >> I’ll answer the questions in order:
>> >>
>> >> 1. Your guess is correct! The Hash Join may be used standalone (mostly
>> in
>> >> testing or benchmarking for now) or as part of the ExecNode. The
>> ExecNode
>> >> will pass the task to the Executor to be scheduled, or will run it
>> >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>> >> benchmark uses OpenMP to schedule things, and passes a lambda that does
>> >> OpenMP things to the HashJoin.
>> >>
>> >> 2. We might not have an executor if we want to execute synchronously.
>> This
>> >> is set during construction of the ExecContext, which is given to the
>> >> ExecPlan during creation. If the ExecContext has a nullptr Executor,
>> then
>> >> we are in async mode, otherwise we use the Executor to schedule. One
>> >> confusing thing is that we also have a SerialExecutor - I’m actually
>> not
>> >> quite sure what the difference between using that and setting the
>> Executor
>> >> to nullptr is (might have something to do with testing?). @Weston
>> probably
>> >> knows
>> >>
>> >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>> >> the function that implements the work that needs to be split up,
>> >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>> >> will receive the index of the task. If you’re familiar with OpenMP,
>> it’s
>> >> equivalent to this:
>> >>
>> >> #pragma omp parallel for
>> >> for(int i = 0; i < 100; i++)
>> >> TaskImpl(omp_get_thread_num(), i);
>> >> TaskGroupContinuationImpl();
>> >>
>> >> Examples of the two are here:
>> >>
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> >> <
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> >
>> >>>
>> >>
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> >
>> >> <
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> >
>> >>>
>> >>
>> >> Sasha
>> >>
>> >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
>> >>>
>> >>> Hello!
>> >>>
>> >>> I am reading the use of TaskScheduler inside C++ compute code (reading
>> >> hash
>> >>> join) and have some questions about it, in particular:
>> >>>
>> >>> (1) What the purpose of SchedulerTaskCallback defined here:
>> >>>
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>> >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>> >>> provide an implementation of a task executor, and the implementation
>> of
>> >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>> >>> implementation)
>> >>>
>> >>> (2) When would this task context not have an executor?
>> >>>
>> >>
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>> >>>
>> >>> (3) What's the difference between TaskImpl and
>> TaskGroupContinuationImpl
>> >> in
>> >>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>> >>> TaskGroupContinuationImpl?
>> >>>
>> >>> Sorry I am still learning the Arrow compute internals and appreciate
>> help
>> >>> on understanding these.
>> >>>
>> >>> Li
>>
>>

Re: [Compute][C++] Question on compute scheduler

Posted by Li Jin <ic...@gmail.com>.
Thanks both for the reply. To add a bit more context, I am trying to
implement an "asof join". Here I have one left table and n right table, and
all batches arrive in time order.

In order to produce a output for a left batch, I would need to wait until I
received enough batches from the right tables to cover all potential
matches (wait until I have seen right timestamps outside the matching range)

From both replies it sounds like I should just do the check if I got enough
data in InputReceived function and do work there when I have enough data.

However, one thing that I am not sure about is how does the parallelization
comes into play - it sounds like InputReceived could be called by multiple
thread of the same input node for different batches? Currently I have just
trying to get a baseline implementation that has one thread doing the join
so if InputReceived is called by multiple thread I might ended up blocking
other threads unnecessarily.

If I have a dedicate thread/executor that does the join and InputReceived
just queue the batches and return immediately, I felt like it would be more
efficient.

Thoughts?

Thanks,
Li

On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <kr...@gmail.com>
wrote:

> If I understand correctly, on InputReceived you’ll be accumulating batches
> until you have enough to compute the next output? In that case, you have
> two options: you can either just immediately compute it using the same
> thread, or call the schedule_callback directly (not using the scheduler). I
> think your pseudocode is correct - since whether or not you can output the
> next batch can only change on InputReceived, that’s the only spot you need
> to check. I think an elaboration of your pseudocode could be something like:
>
> Status InputReceived(Batch b)
>     lock(accum_lock);
>     accum.push_back(b);
>     if(enough_inputs)
>         vector<Batch> batches = std::move(accum);
>         unlock(accum_lock);
>         compute_next_output(batches);
>     return Status::OK();
>
> Sasha
>
> > On Apr 25, 2022, at 3:29 PM, Li Jin <ic...@gmail.com> wrote:
> >
> > Thanks! That's super helpful.
> >
> > A follow up question on TaskScheduler - What's the correct way to define
> a
> > task that "do work if input batches are ready, otherwise try later"?
> >
> > Sth like
> >
> > Status try_process():
> > if enough_inputs_to _produce_next_output:
> > compute_and_produce_next_output();
> > return Status::OK()
> > else:
> > # Is this right?
> > # Exit and try later
> > return Status::OK();
> >
> > If I register this function with TaskScheduler, I think it only gets run
> > once, so I think I might need to schedule the next task when inputs are
> not
> > ready but I am not sure of the best way to do that. Any suggestions?
> >
> > Li
> >
> > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
> krassovskysasha@gmail.com <ma...@gmail.com>>
> > wrote:
> >
> >> Hi Li,
> >> I’ll answer the questions in order:
> >>
> >> 1. Your guess is correct! The Hash Join may be used standalone (mostly
> in
> >> testing or benchmarking for now) or as part of the ExecNode. The
> ExecNode
> >> will pass the task to the Executor to be scheduled, or will run it
> >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> >> benchmark uses OpenMP to schedule things, and passes a lambda that does
> >> OpenMP things to the HashJoin.
> >>
> >> 2. We might not have an executor if we want to execute synchronously.
> This
> >> is set during construction of the ExecContext, which is given to the
> >> ExecPlan during creation. If the ExecContext has a nullptr Executor,
> then
> >> we are in async mode, otherwise we use the Executor to schedule. One
> >> confusing thing is that we also have a SerialExecutor - I’m actually not
> >> quite sure what the difference between using that and setting the
> Executor
> >> to nullptr is (might have something to do with testing?). @Weston
> probably
> >> knows
> >>
> >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
> >> the function that implements the work that needs to be split up,
> >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
> >> will receive the index of the task. If you’re familiar with OpenMP, it’s
> >> equivalent to this:
> >>
> >> #pragma omp parallel for
> >> for(int i = 0; i < 100; i++)
> >> TaskImpl(omp_get_thread_num(), i);
> >> TaskGroupContinuationImpl();
> >>
> >> Examples of the two are here:
> >>
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >> <
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >
> >>>
> >>
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >
> >> <
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >
> >>>
> >>
> >> Sasha
> >>
> >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
> >>>
> >>> Hello!
> >>>
> >>> I am reading the use of TaskScheduler inside C++ compute code (reading
> >> hash
> >>> join) and have some questions about it, in particular:
> >>>
> >>> (1) What the purpose of SchedulerTaskCallback defined here:
> >>>
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> >>> provide an implementation of a task executor, and the implementation of
> >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> >>> implementation)
> >>>
> >>> (2) When would this task context not have an executor?
> >>>
> >>
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >>>
> >>> (3) What's the difference between TaskImpl and
> TaskGroupContinuationImpl
> >> in
> >>> TaskScheduler::RegisterTaskGroup? And how would one normally define
> >>> TaskGroupContinuationImpl?
> >>>
> >>> Sorry I am still learning the Arrow compute internals and appreciate
> help
> >>> on understanding these.
> >>>
> >>> Li
>
>

Re: [Compute][C++] Question on compute scheduler

Posted by Sasha Krassovsky <kr...@gmail.com>.
If I understand correctly, on InputReceived you’ll be accumulating batches until you have enough to compute the next output? In that case, you have two options: you can either just immediately compute it using the same thread, or call the schedule_callback directly (not using the scheduler). I think your pseudocode is correct - since whether or not you can output the next batch can only change on InputReceived, that’s the only spot you need to check. I think an elaboration of your pseudocode could be something like:

Status InputReceived(Batch b)
    lock(accum_lock);
    accum.push_back(b);
    if(enough_inputs)
        vector<Batch> batches = std::move(accum);
        unlock(accum_lock);
        compute_next_output(batches);
    return Status::OK();

Sasha 

> On Apr 25, 2022, at 3:29 PM, Li Jin <ic...@gmail.com> wrote:
> 
> Thanks! That's super helpful.
> 
> A follow up question on TaskScheduler - What's the correct way to define a
> task that "do work if input batches are ready, otherwise try later"?
> 
> Sth like
> 
> Status try_process():
> if enough_inputs_to _produce_next_output:
> compute_and_produce_next_output();
> return Status::OK()
> else:
> # Is this right?
> # Exit and try later
> return Status::OK();
> 
> If I register this function with TaskScheduler, I think it only gets run
> once, so I think I might need to schedule the next task when inputs are not
> ready but I am not sure of the best way to do that. Any suggestions?
> 
> Li
> 
> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <krassovskysasha@gmail.com <ma...@gmail.com>>
> wrote:
> 
>> Hi Li,
>> I’ll answer the questions in order:
>> 
>> 1. Your guess is correct! The Hash Join may be used standalone (mostly in
>> testing or benchmarking for now) or as part of the ExecNode. The ExecNode
>> will pass the task to the Executor to be scheduled, or will run it
>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>> benchmark uses OpenMP to schedule things, and passes a lambda that does
>> OpenMP things to the HashJoin.
>> 
>> 2. We might not have an executor if we want to execute synchronously. This
>> is set during construction of the ExecContext, which is given to the
>> ExecPlan during creation. If the ExecContext has a nullptr Executor, then
>> we are in async mode, otherwise we use the Executor to schedule. One
>> confusing thing is that we also have a SerialExecutor - I’m actually not
>> quite sure what the difference between using that and setting the Executor
>> to nullptr is (might have something to do with testing?). @Weston probably
>> knows
>> 
>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>> the function that implements the work that needs to be split up,
>> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>> will receive the index of the task. If you’re familiar with OpenMP, it’s
>> equivalent to this:
>> 
>> #pragma omp parallel for
>> for(int i = 0; i < 100; i++)
>> TaskImpl(omp_get_thread_num(), i);
>> TaskGroupContinuationImpl();
>> 
>> Examples of the two are here:
>> 
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
>>> 
>> 
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>>> 
>> 
>> Sasha
>> 
>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
>>> 
>>> Hello!
>>> 
>>> I am reading the use of TaskScheduler inside C++ compute code (reading
>> hash
>>> join) and have some questions about it, in particular:
>>> 
>>> (1) What the purpose of SchedulerTaskCallback defined here:
>>> 
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>>> provide an implementation of a task executor, and the implementation of
>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>>> implementation)
>>> 
>>> (2) When would this task context not have an executor?
>>> 
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>>> 
>>> (3) What's the difference between TaskImpl and TaskGroupContinuationImpl
>> in
>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>>> TaskGroupContinuationImpl?
>>> 
>>> Sorry I am still learning the Arrow compute internals and appreciate help
>>> on understanding these.
>>> 
>>> Li


Re: [Compute][C++] Question on compute scheduler

Posted by Li Jin <ic...@gmail.com>.
Thanks! That's super helpful.

A follow up question on TaskScheduler - What's the correct way to define a
task that "do work if input batches are ready, otherwise try later"?

Sth like

Status try_process():
    if enough_inputs_to _produce_next_output:
         compute_and_produce_next_output();
         return Status::OK()
    else:
         # Is this right?
         # Exit and try later
         return Status::OK();

If I register this function with TaskScheduler, I think it only gets run
once, so I think I might need to schedule the next task when inputs are not
ready but I am not sure of the best way to do that. Any suggestions?

Li

On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <kr...@gmail.com>
wrote:

> Hi Li,
> I’ll answer the questions in order:
>
> 1. Your guess is correct! The Hash Join may be used standalone (mostly in
> testing or benchmarking for now) or as part of the ExecNode. The ExecNode
> will pass the task to the Executor to be scheduled, or will run it
> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> benchmark uses OpenMP to schedule things, and passes a lambda that does
> OpenMP things to the HashJoin.
>
> 2. We might not have an executor if we want to execute synchronously. This
> is set during construction of the ExecContext, which is given to the
> ExecPlan during creation. If the ExecContext has a nullptr Executor, then
> we are in async mode, otherwise we use the Executor to schedule. One
> confusing thing is that we also have a SerialExecutor - I’m actually not
> quite sure what the difference between using that and setting the Executor
> to nullptr is (might have something to do with testing?). @Weston probably
> knows
>
> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
> the function that implements the work that needs to be split up,
> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
> will receive the index of the task. If you’re familiar with OpenMP, it’s
> equivalent to this:
>
> #pragma omp parallel for
> for(int i = 0; i < 100; i++)
>     TaskImpl(omp_get_thread_num(), i);
> TaskGroupContinuationImpl();
>
> Examples of the two are here:
>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >
>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >
>
> Sasha
>
> > On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
> >
> > Hello!
> >
> > I am reading the use of TaskScheduler inside C++ compute code (reading
> hash
> > join) and have some questions about it, in particular:
> >
> > (1) What the purpose of SchedulerTaskCallback defined here:
> >
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> > (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> > provide an implementation of a task executor, and the implementation of
> > SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> > implementation)
> >
> > (2) When would this task context not have an executor?
> >
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >
> > (3) What's the difference between TaskImpl and TaskGroupContinuationImpl
> in
> > TaskScheduler::RegisterTaskGroup? And how would one normally define
> > TaskGroupContinuationImpl?
> >
> > Sorry I am still learning the Arrow compute internals and appreciate help
> > on understanding these.
> >
> > Li
>
>

Re: [Compute][C++] Question on compute scheduler

Posted by Sasha Krassovsky <kr...@gmail.com>.
Hi Li,
I’ll answer the questions in order:

1. Your guess is correct! The Hash Join may be used standalone (mostly in testing or benchmarking for now) or as part of the ExecNode. The ExecNode will pass the task to the Executor to be scheduled, or will run it immediately if it’s in sync mode (i.e. no executor). Our Hash Join benchmark uses OpenMP to schedule things, and passes a lambda that does OpenMP things to the HashJoin.

2. We might not have an executor if we want to execute synchronously. This is set during construction of the ExecContext, which is given to the ExecPlan during creation. If the ExecContext has a nullptr Executor, then we are in async mode, otherwise we use the Executor to schedule. One confusing thing is that we also have a SerialExecutor - I’m actually not quite sure what the difference between using that and setting the Executor to nullptr is (might have something to do with testing?). @Weston probably knows

3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is the function that implements the work that needs to be split up, TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl will receive the index of the task. If you’re familiar with OpenMP, it’s equivalent to this:

#pragma omp parallel for
for(int i = 0; i < 100; i++)
    TaskImpl(omp_get_thread_num(), i);
TaskGroupContinuationImpl();

Examples of the two are here:
https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>

Sasha

> On Apr 25, 2022, at 8:35 AM, Li Jin <ic...@gmail.com> wrote:
> 
> Hello!
> 
> I am reading the use of TaskScheduler inside C++ compute code (reading hash
> join) and have some questions about it, in particular:
> 
> (1) What the purpose of SchedulerTaskCallback defined here:
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> provide an implementation of a task executor, and the implementation of
> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> implementation)
> 
> (2) When would this task context not have an executor?
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> 
> (3) What's the difference between TaskImpl and TaskGroupContinuationImpl in
> TaskScheduler::RegisterTaskGroup? And how would one normally define
> TaskGroupContinuationImpl?
> 
> Sorry I am still learning the Arrow compute internals and appreciate help
> on understanding these.
> 
> Li