You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Kenta Murata <mr...@mrkn.jp> on 2020/08/05 15:07:48 UTC

[DISCUSS][C++] Group by operation for RecordBatch and Table

Hi folks,

Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation
features for RecordBatch and Table.  Because these features are written in
Ruby, they are too slow for large size data.  We need to make them much
faster.

To improve their calculation speed, they should be written in C++, and
should be put in Arrow C++ instead of Red Arrow.

Is anyone working on implementing group-by operation for RecordBatch and
Table in Arrow C++?  If no one has worked on it, I would like to try it.

By the way, I found that the grouped aggregation feature is mentioned in
the design document of Arrow C++ Query Engine.  Is Query Engine, not Arrow
C++ Core, a suitable location to implement group-by operation?

Re: [DISCUSS][C++] Group by operation for RecordBatch and Table

Posted by Kenta Murata <mr...@mrkn.jp>.
Hi Wes,

Thank you very much giving us the detail explanation of your thoughts.

I need the knowledge of the SOTA of query engine you pointed out if I’ll
contribute to C++ Query Engine or just write the binding of it.  I’m
studying the article and the codes.

Regards,
Kenta Murata

On Thu, Aug 6, 2020 at 4:17 Wes McKinney <we...@gmail.com> wrote:

> I see there's a bunch of additional aggregation code in Dremio that
> might serve as inspiration (some of which is related to distributed
> aggregation, so may not be relevant)
>
>
> https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate
>
> Maybe Andy or one of the other active Rust DataFusion developers can
> comment on the approach taken for hash aggs there
>
> On Wed, Aug 5, 2020 at 1:52 PM Wes McKinney <we...@gmail.com> wrote:
> >
> > hi Kenta,
> >
> > Yes, I think it only makes sense to implement this in the context of
> > the query engine project. Here's a list of assorted thoughts about it:
> >
> > * I have been mentally planning to follow the Vectorwise-type query
> > engine architecture that's discussed in [1] [2] and many other
> > academic papers. I believe this is how some other current generation
> > open source columnar query engines work, such as Dremio [3] and DuckDB
> > [4][5].
> > * Hash (aka "group") aggregations need to be able to process arbitrary
> > expressions, not only a plain input column. So it's not enough to be
> > able to compute "sum(x) group by y" where "x" and "y" are fields in a
> > RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY
> > $GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ...
> > are any column expressions computing from the input relations (keep in
> > mind that an aggregation could apply to stream of record batches
> > produced by a join). In any case, expression evaluation is a
> > closely-related task and should be implemented ASAP.
> > * Hash aggregation functions themselves should probably be introduced
> > as a new Function type in arrow::compute. I don't think it would be
> > appropriate to use the existing "SCALAR_AGGREGATE" functions, instead
> > we should introduce a new HASH_AGGREGATE function type that accepts
> > input data to be aggregated along with an array of pre-computed bucket
> > ids (which are computed by probing the HT). So rather than
> > Update(state, args) like we have for scalar aggregate, the primary
> > interface for group aggregation is Update(state, bucket_ids, args)
> > * The HashAggregation operator should be able to process an arbitrary
> > iterator of record batches
> > * We will probably want to adapt an existing or implement a new
> > concurrent hash table so that aggregations can be performed in
> > parallel without requiring a post-aggregation merge step
> > * There's some general support machinery for hashing multiple fields
> > and then doing efficient vectorized hash table probes (to assign
> > aggregation bucket id's to each row position)
> >
> > I think it is worth investing the effort to build something that is
> > reasonably consistent with the "state of the art" in database systems
> > (at least according to what we are able to build with our current
> > resources) rather than building something more crude that has to be
> > replaced with new implementation later.
> >
> > I'd like to help personally with this work (particularly since the
> > natural next step with my recent work in arrow/compute is to implement
> > expression evaluation) but I won't have significant bandwidth for it
> > until later this month or early September. If someone feels that they
> > sufficiently understand the state of the art for this type of workload
> > and wants to help with laying down the abstract C++ APIs for
> > Volcano-style query execution and an implementation of hash
> > aggregation, that sounds great.
> >
> > Thanks,
> > Wes
> >
> > [1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf
> > [2]: https://github.com/TimoKersten/db-engine-paradigms
> > [3]:
> https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash
> > [4]:
> https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp
> > [5]:
> https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp
> >
> > On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata <mr...@mrkn.jp> wrote:
> > >
> > > Hi folks,
> > >
> > > Red Arrow, the Ruby binding of Arrow GLib, implements grouped
> aggregation
> > > features for RecordBatch and Table.  Because these features are
> written in
> > > Ruby, they are too slow for large size data.  We need to make them much
> > > faster.
> > >
> > > To improve their calculation speed, they should be written in C++, and
> > > should be put in Arrow C++ instead of Red Arrow.
> > >
> > > Is anyone working on implementing group-by operation for RecordBatch
> and
> > > Table in Arrow C++?  If no one has worked on it, I would like to try
> it.
> > >
> > > By the way, I found that the grouped aggregation feature is mentioned
> in
> > > the design document of Arrow C++ Query Engine.  Is Query Engine, not
> Arrow
> > > C++ Core, a suitable location to implement group-by operation?
>
-- 
Kenta Murata, sent from Gmail Mobile

Re: [DISCUSS][C++] Group by operation for RecordBatch and Table

Posted by Wes McKinney <we...@gmail.com>.
I see there's a bunch of additional aggregation code in Dremio that
might serve as inspiration (some of which is related to distributed
aggregation, so may not be relevant)

https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate

Maybe Andy or one of the other active Rust DataFusion developers can
comment on the approach taken for hash aggs there

On Wed, Aug 5, 2020 at 1:52 PM Wes McKinney <we...@gmail.com> wrote:
>
> hi Kenta,
>
> Yes, I think it only makes sense to implement this in the context of
> the query engine project. Here's a list of assorted thoughts about it:
>
> * I have been mentally planning to follow the Vectorwise-type query
> engine architecture that's discussed in [1] [2] and many other
> academic papers. I believe this is how some other current generation
> open source columnar query engines work, such as Dremio [3] and DuckDB
> [4][5].
> * Hash (aka "group") aggregations need to be able to process arbitrary
> expressions, not only a plain input column. So it's not enough to be
> able to compute "sum(x) group by y" where "x" and "y" are fields in a
> RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY
> $GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ...
> are any column expressions computing from the input relations (keep in
> mind that an aggregation could apply to stream of record batches
> produced by a join). In any case, expression evaluation is a
> closely-related task and should be implemented ASAP.
> * Hash aggregation functions themselves should probably be introduced
> as a new Function type in arrow::compute. I don't think it would be
> appropriate to use the existing "SCALAR_AGGREGATE" functions, instead
> we should introduce a new HASH_AGGREGATE function type that accepts
> input data to be aggregated along with an array of pre-computed bucket
> ids (which are computed by probing the HT). So rather than
> Update(state, args) like we have for scalar aggregate, the primary
> interface for group aggregation is Update(state, bucket_ids, args)
> * The HashAggregation operator should be able to process an arbitrary
> iterator of record batches
> * We will probably want to adapt an existing or implement a new
> concurrent hash table so that aggregations can be performed in
> parallel without requiring a post-aggregation merge step
> * There's some general support machinery for hashing multiple fields
> and then doing efficient vectorized hash table probes (to assign
> aggregation bucket id's to each row position)
>
> I think it is worth investing the effort to build something that is
> reasonably consistent with the "state of the art" in database systems
> (at least according to what we are able to build with our current
> resources) rather than building something more crude that has to be
> replaced with new implementation later.
>
> I'd like to help personally with this work (particularly since the
> natural next step with my recent work in arrow/compute is to implement
> expression evaluation) but I won't have significant bandwidth for it
> until later this month or early September. If someone feels that they
> sufficiently understand the state of the art for this type of workload
> and wants to help with laying down the abstract C++ APIs for
> Volcano-style query execution and an implementation of hash
> aggregation, that sounds great.
>
> Thanks,
> Wes
>
> [1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf
> [2]: https://github.com/TimoKersten/db-engine-paradigms
> [3]: https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash
> [4]: https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp
> [5]: https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp
>
> On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata <mr...@mrkn.jp> wrote:
> >
> > Hi folks,
> >
> > Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation
> > features for RecordBatch and Table.  Because these features are written in
> > Ruby, they are too slow for large size data.  We need to make them much
> > faster.
> >
> > To improve their calculation speed, they should be written in C++, and
> > should be put in Arrow C++ instead of Red Arrow.
> >
> > Is anyone working on implementing group-by operation for RecordBatch and
> > Table in Arrow C++?  If no one has worked on it, I would like to try it.
> >
> > By the way, I found that the grouped aggregation feature is mentioned in
> > the design document of Arrow C++ Query Engine.  Is Query Engine, not Arrow
> > C++ Core, a suitable location to implement group-by operation?

Re: [DISCUSS][C++] Group by operation for RecordBatch and Table

Posted by Wes McKinney <we...@gmail.com>.
hi Kenta,

Yes, I think it only makes sense to implement this in the context of
the query engine project. Here's a list of assorted thoughts about it:

* I have been mentally planning to follow the Vectorwise-type query
engine architecture that's discussed in [1] [2] and many other
academic papers. I believe this is how some other current generation
open source columnar query engines work, such as Dremio [3] and DuckDB
[4][5].
* Hash (aka "group") aggregations need to be able to process arbitrary
expressions, not only a plain input column. So it's not enough to be
able to compute "sum(x) group by y" where "x" and "y" are fields in a
RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY
$GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ...
are any column expressions computing from the input relations (keep in
mind that an aggregation could apply to stream of record batches
produced by a join). In any case, expression evaluation is a
closely-related task and should be implemented ASAP.
* Hash aggregation functions themselves should probably be introduced
as a new Function type in arrow::compute. I don't think it would be
appropriate to use the existing "SCALAR_AGGREGATE" functions, instead
we should introduce a new HASH_AGGREGATE function type that accepts
input data to be aggregated along with an array of pre-computed bucket
ids (which are computed by probing the HT). So rather than
Update(state, args) like we have for scalar aggregate, the primary
interface for group aggregation is Update(state, bucket_ids, args)
* The HashAggregation operator should be able to process an arbitrary
iterator of record batches
* We will probably want to adapt an existing or implement a new
concurrent hash table so that aggregations can be performed in
parallel without requiring a post-aggregation merge step
* There's some general support machinery for hashing multiple fields
and then doing efficient vectorized hash table probes (to assign
aggregation bucket id's to each row position)

I think it is worth investing the effort to build something that is
reasonably consistent with the "state of the art" in database systems
(at least according to what we are able to build with our current
resources) rather than building something more crude that has to be
replaced with new implementation later.

I'd like to help personally with this work (particularly since the
natural next step with my recent work in arrow/compute is to implement
expression evaluation) but I won't have significant bandwidth for it
until later this month or early September. If someone feels that they
sufficiently understand the state of the art for this type of workload
and wants to help with laying down the abstract C++ APIs for
Volcano-style query execution and an implementation of hash
aggregation, that sounds great.

Thanks,
Wes

[1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf
[2]: https://github.com/TimoKersten/db-engine-paradigms
[3]: https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash
[4]: https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp
[5]: https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp

On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata <mr...@mrkn.jp> wrote:
>
> Hi folks,
>
> Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation
> features for RecordBatch and Table.  Because these features are written in
> Ruby, they are too slow for large size data.  We need to make them much
> faster.
>
> To improve their calculation speed, they should be written in C++, and
> should be put in Arrow C++ instead of Red Arrow.
>
> Is anyone working on implementing group-by operation for RecordBatch and
> Table in Arrow C++?  If no one has worked on it, I would like to try it.
>
> By the way, I found that the grouped aggregation feature is mentioned in
> the design document of Arrow C++ Query Engine.  Is Query Engine, not Arrow
> C++ Core, a suitable location to implement group-by operation?