You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Andrew Lamb <al...@influxdata.com> on 2020/08/21 21:37:04 UTC

[Rust] [DataFusion] Proposal for User Defined PlanNode / Operator API

I would like to propose and request comments from the DataFusion community
on adding user defined LogicalPlanNodes.

A detailed proposal is in this google doc
<https://docs.google.com/document/d/1IHCGkCuUvnE9BavkykPULn6Ugxgqc1JShT4nz1vMi7g/edit#>
(comments
enabled, copy/pasted below for your convenience).

Here is a PR  showing the approach how it could work:
https://github.com/apache/arrow/pull/8020
<https://github.com/apache/arrow/pull/8020#pullrequestreview-472829313>

Thanks!
Andrew




Proposal for User Defined PlanNode / Operator API

August 21, 2020

Andrew Lamb

This document is a high level proposal / request for comments from the
DataFusion community on adding user defined LogicalPlanNodes.
Problem Statement / Rationale

We are contemplating building a new query engine for a time series related
engine using DataFusion. To do so, we will likely need domain specific
optimizations which are unlikely to be appropriate for a general purpose
engine such as DataFusion because of their specialization.

Examples of the kinds of optimizations we are thinking of:

   1.

   Push down (certain) filter predicates and aggregates into the actual
   scan over specialized storage structures.
   2.

   Specialized time series specific aggregates that rely on order of the
   input rows such as first/last.

Proposed Solution

I propose changing LogicalPlan
<https://github.com/apache/arrow/blob/master/rust/datafusion/src/logicalplan.rs#L773>
nodes from an enum of structs which must be defined in the DataFusion
source code to a tree `dyn LogicalPlanNode` trait objects. Here is a PR
that demonstrates how such an approach could work:
https://github.com/apache/arrow/pull/8020

The primary benefit of such a design over the existing enum of structs is
that users of the DataFusion library can write their own user defined
LogicalPlan nodes and still take advantage of general purpose logic such as
predicate push down.

A downside of this design is that it will isolate the logic for the
treatment of each type of LogicalPlanNode into its own module for that plan
node. This means that algorithms over LogicalPlan nodes (e.g. predicate
pushdown) will no longer have any node type specific logic in them which
could make them harder to reason about.

Prior Work:

I am not familiar enough with the Spark codebase to really understand
how/if Spark allows this, but I think the high level idea is that the
catalyst
<https://github.com/apache/spark/tree/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst>
library provides a similar interfaces called LogicalPlan
<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L29>
that defines the operators that are available in Spark SQL
Alternates considered:

One alternate design we could pursue is to extend the existing
TableProvider trait to support more sophistication (e.g. predicate
pushdown). Adding specialized aggregation operations in the TableProvider
trait seems like it would be confusing to most users who didn’t have
specialized needs to push partial aggregations into the raw scans

Another alternate design that we could use on our project is to maintain a
fork of the DataFusion code base and simply add our own plan nodes directly
to our fork. We would prefer to avoid this as it will be more expensive to
maintain and we think the user defined API would likely be valuable to
others in the community

Re: [Rust] [DataFusion] Proposal for User Defined PlanNode / Operator API

Posted by Andrew Lamb <al...@influxdata.com>.
Cool -- thanks Jorge and Andy. I'll start working on an actual proposed PR
to merge. I really like Jorge's idea of trying to port the Aggregate
operator to use a LogicalPlan Node style and see how easy that would be.

Andrew

On Sat, Aug 22, 2020 at 12:01 PM Andy Grove <an...@gmail.com> wrote:

> I've added some comments as well. I fully support converting LogicalPlan
> from enum to trait. You will notice that I implemented PhysicalPlan using
> traits, because I had more Rust experience at the time I did this work.
>
> There are going to be some design challenges to make this change, I'm sure,
> but I think we can work through them.
>
> Thanks,
>
> Andy.
>
> On Fri, Aug 21, 2020 at 3:37 PM Andrew Lamb <al...@influxdata.com> wrote:
>
> > I would like to propose and request comments from the DataFusion
> community
> > on adding user defined LogicalPlanNodes.
> >
> > A detailed proposal is in this google doc
> > <
> >
> https://docs.google.com/document/d/1IHCGkCuUvnE9BavkykPULn6Ugxgqc1JShT4nz1vMi7g/edit#
> > >
> > (comments
> > enabled, copy/pasted below for your convenience).
> >
> > Here is a PR  showing the approach how it could work:
> > https://github.com/apache/arrow/pull/8020
> > <https://github.com/apache/arrow/pull/8020#pullrequestreview-472829313>
> >
> > Thanks!
> > Andrew
> >
> >
> >
> >
> > Proposal for User Defined PlanNode / Operator API
> >
> > August 21, 2020
> >
> > Andrew Lamb
> >
> > This document is a high level proposal / request for comments from the
> > DataFusion community on adding user defined LogicalPlanNodes.
> > Problem Statement / Rationale
> >
> > We are contemplating building a new query engine for a time series
> related
> > engine using DataFusion. To do so, we will likely need domain specific
> > optimizations which are unlikely to be appropriate for a general purpose
> > engine such as DataFusion because of their specialization.
> >
> > Examples of the kinds of optimizations we are thinking of:
> >
> >    1.
> >
> >    Push down (certain) filter predicates and aggregates into the actual
> >    scan over specialized storage structures.
> >    2.
> >
> >    Specialized time series specific aggregates that rely on order of the
> >    input rows such as first/last.
> >
> > Proposed Solution
> >
> > I propose changing LogicalPlan
> > <
> >
> https://github.com/apache/arrow/blob/master/rust/datafusion/src/logicalplan.rs#L773
> > >
> > nodes from an enum of structs which must be defined in the DataFusion
> > source code to a tree `dyn LogicalPlanNode` trait objects. Here is a PR
> > that demonstrates how such an approach could work:
> > https://github.com/apache/arrow/pull/8020
> >
> > The primary benefit of such a design over the existing enum of structs is
> > that users of the DataFusion library can write their own user defined
> > LogicalPlan nodes and still take advantage of general purpose logic such
> as
> > predicate push down.
> >
> > A downside of this design is that it will isolate the logic for the
> > treatment of each type of LogicalPlanNode into its own module for that
> plan
> > node. This means that algorithms over LogicalPlan nodes (e.g. predicate
> > pushdown) will no longer have any node type specific logic in them which
> > could make them harder to reason about.
> >
> > Prior Work:
> >
> > I am not familiar enough with the Spark codebase to really understand
> > how/if Spark allows this, but I think the high level idea is that the
> > catalyst
> > <
> >
> https://github.com/apache/spark/tree/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst
> > >
> > library provides a similar interfaces called LogicalPlan
> > <
> >
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L29
> > >
> > that defines the operators that are available in Spark SQL
> > Alternates considered:
> >
> > One alternate design we could pursue is to extend the existing
> > TableProvider trait to support more sophistication (e.g. predicate
> > pushdown). Adding specialized aggregation operations in the TableProvider
> > trait seems like it would be confusing to most users who didn’t have
> > specialized needs to push partial aggregations into the raw scans
> >
> > Another alternate design that we could use on our project is to maintain
> a
> > fork of the DataFusion code base and simply add our own plan nodes
> directly
> > to our fork. We would prefer to avoid this as it will be more expensive
> to
> > maintain and we think the user defined API would likely be valuable to
> > others in the community
> >
>

Re: [Rust] [DataFusion] Proposal for User Defined PlanNode / Operator API

Posted by Andy Grove <an...@gmail.com>.
I've added some comments as well. I fully support converting LogicalPlan
from enum to trait. You will notice that I implemented PhysicalPlan using
traits, because I had more Rust experience at the time I did this work.

There are going to be some design challenges to make this change, I'm sure,
but I think we can work through them.

Thanks,

Andy.

On Fri, Aug 21, 2020 at 3:37 PM Andrew Lamb <al...@influxdata.com> wrote:

> I would like to propose and request comments from the DataFusion community
> on adding user defined LogicalPlanNodes.
>
> A detailed proposal is in this google doc
> <
> https://docs.google.com/document/d/1IHCGkCuUvnE9BavkykPULn6Ugxgqc1JShT4nz1vMi7g/edit#
> >
> (comments
> enabled, copy/pasted below for your convenience).
>
> Here is a PR  showing the approach how it could work:
> https://github.com/apache/arrow/pull/8020
> <https://github.com/apache/arrow/pull/8020#pullrequestreview-472829313>
>
> Thanks!
> Andrew
>
>
>
>
> Proposal for User Defined PlanNode / Operator API
>
> August 21, 2020
>
> Andrew Lamb
>
> This document is a high level proposal / request for comments from the
> DataFusion community on adding user defined LogicalPlanNodes.
> Problem Statement / Rationale
>
> We are contemplating building a new query engine for a time series related
> engine using DataFusion. To do so, we will likely need domain specific
> optimizations which are unlikely to be appropriate for a general purpose
> engine such as DataFusion because of their specialization.
>
> Examples of the kinds of optimizations we are thinking of:
>
>    1.
>
>    Push down (certain) filter predicates and aggregates into the actual
>    scan over specialized storage structures.
>    2.
>
>    Specialized time series specific aggregates that rely on order of the
>    input rows such as first/last.
>
> Proposed Solution
>
> I propose changing LogicalPlan
> <
> https://github.com/apache/arrow/blob/master/rust/datafusion/src/logicalplan.rs#L773
> >
> nodes from an enum of structs which must be defined in the DataFusion
> source code to a tree `dyn LogicalPlanNode` trait objects. Here is a PR
> that demonstrates how such an approach could work:
> https://github.com/apache/arrow/pull/8020
>
> The primary benefit of such a design over the existing enum of structs is
> that users of the DataFusion library can write their own user defined
> LogicalPlan nodes and still take advantage of general purpose logic such as
> predicate push down.
>
> A downside of this design is that it will isolate the logic for the
> treatment of each type of LogicalPlanNode into its own module for that plan
> node. This means that algorithms over LogicalPlan nodes (e.g. predicate
> pushdown) will no longer have any node type specific logic in them which
> could make them harder to reason about.
>
> Prior Work:
>
> I am not familiar enough with the Spark codebase to really understand
> how/if Spark allows this, but I think the high level idea is that the
> catalyst
> <
> https://github.com/apache/spark/tree/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst
> >
> library provides a similar interfaces called LogicalPlan
> <
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L29
> >
> that defines the operators that are available in Spark SQL
> Alternates considered:
>
> One alternate design we could pursue is to extend the existing
> TableProvider trait to support more sophistication (e.g. predicate
> pushdown). Adding specialized aggregation operations in the TableProvider
> trait seems like it would be confusing to most users who didn’t have
> specialized needs to push partial aggregations into the raw scans
>
> Another alternate design that we could use on our project is to maintain a
> fork of the DataFusion code base and simply add our own plan nodes directly
> to our fork. We would prefer to avoid this as it will be more expensive to
> maintain and we think the user defined API would likely be valuable to
> others in the community
>

Re: [Rust] [DataFusion] Proposal for User Defined PlanNode / Operator API

Posted by Jorge Cardoso Leitão <jo...@gmail.com>.
Hi Andrew,

I carefully went through the document and the PR. Thank you for this!

I believe that the improvements on the PR alone are a major benefit, as it
supports custom logical plans out of the box, which opens a lot of
possibilities to users.

I also like the idea of migrating from enum to a trait `LogicalPlanNode`
for the advantages that you mentioned. I am undecided whether we should go
for it because we may have to trade-off node-specific optimizations: I
currently can't tell whether that trade-off really exists, or whether we
are able to add trait methods (such as
`prevent_predicate_push_down_columns` in your PR) that allows to achieve
full optimizations: I would need to implement this to some of our nodes to
check if we could land on a good end result, which of course entails the
risk that we may scrap it if we can't arrive at one. Maybe @Andy Grove
<an...@gmail.com> or others can formulate a better judgment call here.

In summary, I very much agree with the content of the proposal, but I
currently do not have sufficient information to evaluate the tradeoffs. One
concrete path is to try to migrate a difficult node, e.g. Aggregate, and
see how difficult it is to make our current tests pass.

Best,
Jorge


On Fri, Aug 21, 2020 at 11:37 PM Andrew Lamb <al...@influxdata.com> wrote:

> I would like to propose and request comments from the DataFusion community
> on adding user defined LogicalPlanNodes.
>
> A detailed proposal is in this google doc
> <
> https://docs.google.com/document/d/1IHCGkCuUvnE9BavkykPULn6Ugxgqc1JShT4nz1vMi7g/edit#
> >
> (comments
> enabled, copy/pasted below for your convenience).
>
> Here is a PR  showing the approach how it could work:
> https://github.com/apache/arrow/pull/8020
> <https://github.com/apache/arrow/pull/8020#pullrequestreview-472829313>
>
> Thanks!
> Andrew
>
>
>
>
> Proposal for User Defined PlanNode / Operator API
>
> August 21, 2020
>
> Andrew Lamb
>
> This document is a high level proposal / request for comments from the
> DataFusion community on adding user defined LogicalPlanNodes.
> Problem Statement / Rationale
>
> We are contemplating building a new query engine for a time series related
> engine using DataFusion. To do so, we will likely need domain specific
> optimizations which are unlikely to be appropriate for a general purpose
> engine such as DataFusion because of their specialization.
>
> Examples of the kinds of optimizations we are thinking of:
>
>    1.
>
>    Push down (certain) filter predicates and aggregates into the actual
>    scan over specialized storage structures.
>    2.
>
>    Specialized time series specific aggregates that rely on order of the
>    input rows such as first/last.
>
> Proposed Solution
>
> I propose changing LogicalPlan
> <
> https://github.com/apache/arrow/blob/master/rust/datafusion/src/logicalplan.rs#L773
> >
> nodes from an enum of structs which must be defined in the DataFusion
> source code to a tree `dyn LogicalPlanNode` trait objects. Here is a PR
> that demonstrates how such an approach could work:
> https://github.com/apache/arrow/pull/8020
>
> The primary benefit of such a design over the existing enum of structs is
> that users of the DataFusion library can write their own user defined
> LogicalPlan nodes and still take advantage of general purpose logic such as
> predicate push down.
>
> A downside of this design is that it will isolate the logic for the
> treatment of each type of LogicalPlanNode into its own module for that plan
> node. This means that algorithms over LogicalPlan nodes (e.g. predicate
> pushdown) will no longer have any node type specific logic in them which
> could make them harder to reason about.
>
> Prior Work:
>
> I am not familiar enough with the Spark codebase to really understand
> how/if Spark allows this, but I think the high level idea is that the
> catalyst
> <
> https://github.com/apache/spark/tree/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst
> >
> library provides a similar interfaces called LogicalPlan
> <
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L29
> >
> that defines the operators that are available in Spark SQL
> Alternates considered:
>
> One alternate design we could pursue is to extend the existing
> TableProvider trait to support more sophistication (e.g. predicate
> pushdown). Adding specialized aggregation operations in the TableProvider
> trait seems like it would be confusing to most users who didn’t have
> specialized needs to push partial aggregations into the raw scans
>
> Another alternate design that we could use on our project is to maintain a
> fork of the DataFusion code base and simply add our own plan nodes directly
> to our fork. We would prefer to avoid this as it will be more expensive to
> maintain and we think the user defined API would likely be valuable to
> others in the community
>