You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Jorge Cardoso Leitão <jo...@gmail.com> on 2020/08/17 16:30:48 UTC

Polymorphism in DataFusion

Hi,

Recently, I have been contributing to DataFusion, and I would like to bring
to your attention a question that I faced while PRing to DataFusion that
IMO needs some alignment :)

DataFusion supports scalar UDFs: functions that expect a type, return a
type, and performs some operation on the data (a-la spark UDF). However,
the execution engine is actually dynamically typed:

* a scalar UDF receives an &[ArrayRef] that must be downcasted accordingly
* a scalar UDF must select the builder that matches its signature, so that
its return type matches the ArrayRef that it returns.

This suggests that we can treat functions as polymorphic: as long as the
function handles the different types (e.g. via match), we are good. We
currently do not support multiple input types nor variable return types in
their function signatures.

Our current (non-udf) scalar and aggregate functions are already
polymorphic on both their input and return type: sum(i32) -> i64, sum(f64)
-> f64, "a + b". I have been working on PRs to support polymorphic support
to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as well as
polymorphic aggregate UDFs [2], so that we can extend our offering to more
interesting functions such as "length(t) -> uint", "array(c1, c2)",
"collect_list(t) -> array(t)", etc.

However, while working on [1,2,3], I reach some non-trivial findings that I
would like to share:

Finding 1: to support polymorphic functions, our logical and physical
expressions (Expr and PhysicalExpr) need to be polymorphic as-well: once a
function is polymorphic, any expression containing it is also polymorphic.

Finding 2: when a polymorphic expression passes through our type coercer
optimizer (that tries to coerce types to match a function's signature), it
may be re-casted to a different type. If the return type changes, the
optimizer may need to re-cast operations dependent of the function call
(e.g. a projection followed by an aggregation may need a recast on the
projection and on the aggregation).

Finding 3: when an expression passes through our type coercer optimizer and
is re-casted, its name changes (typically from "expr" to "CAST(expr as
X)"). This implies that a column referenced as #expr down the plan may not
exist depending on the input type of the initial projection/scan.

Finding 1 and 2 IMO are a direct consequence of polymorphism and the only
way to not handle them is by not supporting polymorphism (e.g. the user
registers sqrt_f32 and sqrt_f64, etc).

Finding 3 can be addressed in at least three ways:

A) make the optimizer rewrite the expression as "CAST(expr as X) AS expr",
so that it retains its original name. This hides the actual expression's
calculation, but preserves its original name.
B) accept that expressions can always change its name, which means that the
user should be mindful when writing `col("SELECT sqrt(x) FROM t"`, as the
column name may end up being called `"sqrt(CAST(x as X))"`.
C) Do not support polymorphic functions

Note that we currently already experience effects 1-3, it is just that we
use so few polymorphic functions that these seldomly present themselves. It
was while working on [1,2,3] that I start painting the bigger picture.

Some questions:
1. should continue down the path of polymorphic functions?
2. if yes, how do handle finding 3?

Looking at the current code base, I am confident that we can address the
technical issues to support polymorphic functions. However, it would be
interesting to have your thoughts on this.

[1] https://github.com/apache/arrow/pull/7967
[2] https://github.com/apache/arrow/pull/7971
[3] https://github.com/apache/arrow/pull/7974

Re: Polymorphism in DataFusion

Posted by Andy Grove <an...@gmail.com>.
Yes, this matches my understanding as well. Thanks, Jorge.

On Fri, Aug 21, 2020 at 4:11 PM Andrew Lamb <al...@influxdata.com> wrote:

> Thanks for writing that up Jorge -- I read the documents and left some
> comments, but in general I would say this matches my personal understanding
> of the design of DataFusion and where I think it should head.
>
> On Fri, Aug 21, 2020 at 4:41 PM Jorge Cardoso Leitão <
> jorgecarleitao@gmail.com> wrote:
>
> > Hi everyone,
> >
> > Got it. I agree that we should aim for a proposal.
> >
> > As exercise, I wrote some personal notes
> > <
> >
> https://docs.google.com/document/d/1Asnz29uUS1t60QNbNBU9SiME274rja-hcDvX_RDraFU/edit?usp=sharing
> > >
> > about DataFusion's notions and invariants, as they form the basis for any
> > proposal.
> > I would be interested in knowing how far these are from how you here see
> > DataFusion.
> >
> > Have a great weekend everyone,
> > Jorge
> >
> >
> > On Wed, Aug 19, 2020 at 11:48 PM Andrew Lamb <al...@influxdata.com>
> wrote:
> >
> > > I think B) is closer to what I was thinking.
> > >
> > > We may be using the term statically and dynamically typed a little
> > > differently -- I am sorry for the confusion. I have somewhat lost track
> > of
> > > exactly what we are proposing and for that I apologize.
> > >
> > > I propose a next step of sketching out a proposed API for DataFusion
> UDFs
> > > to implement, and circulate that around for commentary. I don't think I
> > > will have time to do this any time soon (unless it becomes directly
> > > important for the project I am working on)
> > >
> > > Thanks for taking the initiative on this,
> > > Andrew
> > >
> > > On Wed, Aug 19, 2020 at 2:29 PM Jorge Cardoso Leitão <
> > > jorgecarleitao@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > Thank you for this enlightening discussion, Andrew!
> > > >
> > > > So, just to make sure I understood, are you proposing A), B) or
> > something
> > > > else?
> > > >
> > > > A) we should not accept / declare polymorphic operations: all types
> > > should
> > > > be known based on the operation name (e.g. sum_f32, plus_f32, etc.)
> > > > B) we should continue to have "sum", "count", "+", etc. as
> polymorphic
> > > > operations, but we should not allow registering udfs as polymorphic,
> > both
> > > > internally nor externally. I.e. all polymorphic operations are
> > > hard-coded.
> > > >
> > > > Let's assume A) first. I relate to the sentiment that Rust is
> > statically
> > > > typed. However, as I see it, DataFusion is not: our main traits are
> > > > arrow::array::Array and RecordBatch, which are both dynamically typed
> > > (e.g.
> > > > Array::{data_type,as_any} and RecordBatch). Since all ops are also
> > > > dynamically typed (they receive Arc<Array> or RecordBatch) and use
> > > runtime
> > > > reflection via `match array.data_type()` at the physical level to
> > > downcast
> > > > Array to its respective native type, wouldn't A) lead to a major
> change
> > > in
> > > > DataFusion?
> > > >
> > > > Let's now assume B), and let me try to expand on your 3 points:
> > > >
> > > > 1. Once an operation in our plan is polymorphic, the whole plan is
> > > > polymorphic and the final schema can only be inferred from the
> initial
> > > > projection's schema / scan. A simple example of this using only
> > functions
> > > > that we currently support is:
> > > >
> > > > df = scan([c1 float32, c2 float64, c3 float64])
> > > > df = df.select(c1 * c2 as sum12, c1 * c3 as sum13)
> > > > df = df.aggregate(MIN(sum12), MIN(sum13))
> > > >
> > > > The plan for this is that the first product returns a float32 (lower
> > > > precision of both), and the second returns a float64. MIN's return
> type
> > > now
> > > > depends on the first select's return type, which is in a previous
> node.
> > > So,
> > > > even if only our internal ops are polymorphic, this is sufficient to
> > > > require our optimizers to handle dynamically typed expressions and
> > > schemas
> > > > whose type is only known during planning (after the scan's schema is
> > > > known).
> > > >
> > > > 2. I relate to that sentiment. About the same time Andy proposed the
> > (now
> > > > UDFs) dynamically typed UDFs, I made a 1k+ proposal for statically
> > typed
> > > > UDFs. In retrospect, IMO dynamically typed UDFs are a far superior
> > > offering
> > > > as they offer an enormous flexibility and at no additional cost: we
> > could
> > > > offer users an interface with fixed types only (e.g. via a macro),
> but,
> > > in
> > > > the end, all our memory structures are dynamic typed anyway (Array,
> > > > RecordBatch, etc.), and thus whether the user or us, a downcast will
> > > still
> > > > need to take place at runtime.
> > > >
> > > > 3. Users are still able to specify the type they want in query
> > languages
> > > > that support polymorphic functions such as postgres, both at the
> query
> > > > level and on polymorphic UDFs. Most dialects support cast operations
> > that
> > > > allow users to narrow types (::float in postgres, CAST(x AS float64)
> in
> > > > spark), that are only physically executed if needed.
> > > >
> > > > So, to summarize my thoughts so far:
> > > >
> > > > i) DataFusion is dynamically typed by design
> > > > ii) We already support dynamically typed scalar UDFs
> > > > iii) we currently have polymorphic functions (internally) and already
> > > have
> > > > to deal with them on our logical and physical plans.
> > > > iv) there is no practical limitation of supporting polymorphic UDFs,
> it
> > > is
> > > > a matter of whether the benefits outweigh the development and
> > maintenance
> > > > costs.
> > > >
> > > > I am inclined to say that given i-iii), we should support polymorphic
> > > > (scalar and agg) UDFs, which would put us on the same level of UDF
> > > support
> > > > as postgres. However, we should offer a very easy interface for users
> > to
> > > > register a non-polymorphic UDF, e.g.
> > > >
> > > > ctx.register(name, udf(callable, arg_types, return_type)?)?
> > > >
> > > > where udf returns the specialization of a generic UDF that expects N
> > > types
> > > > and returns return_type.
> > > >
> > > > Best,
> > > > Jorge
> > > >
> > > >
> > > > On Tue, Aug 18, 2020 at 6:52 PM Andrew Lamb <al...@influxdata.com>
> > > wrote:
> > > >
> > > > > It is my personal opinion that actual UDF functions  registered
> with
> > > data
> > > > > fusion should take a known set of input types and single return
> type
> > > > (e.g.
> > > > > sum_i32 --> i32). I think this would:
> > > > > 1. Simplify the implementation of both the DataFusion optimizer and
> > the
> > > > > UDFs
> > > > > 2. Make it easier for UDF writers as the UDF code would look more
> > like
> > > > > Rust: the types would be clear from the function signatures, as is
> > the
> > > > case
> > > > > in Rust in general
> > > > > 3. Give the user of SQL / DataFrames the ability to specifically
> > > specify
> > > > > what types they want
> > > > >
> > > > > If we wanted the ability for the user to specify `sum(i)` and let
> the
> > > > type
> > > > > coercion pass pick `sum_i32` or `sum_i64` depending on the input
> > > types, I
> > > > > recommend doing that at a different level than the UDF (perhaps via
> > > > > `register_alias("sum", "sum_i32)` or something), again for both
> > clarity
> > > > of
> > > > > DataFusion implementation as well as UDF specification.
> > > > >
> > > > > Andrew
> > > > >
> > > > > On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão <
> > > > > jorgecarleitao@gmail.com> wrote:
> > > > >
> > > > > > Thanks Andrew,
> > > > > >
> > > > > > I am not sure I articulated this well enough, though, as I did
> not
> > > > > specify
> > > > > > the type of polymorphism that I was thinking about. xD
> > > > > >
> > > > > > My question was/is about whether we should accept functions whose
> > > > return
> > > > > > type is known during planning, and constant during execution, or
> > > > whether
> > > > > > their return types must be constant both during planning and
> > > > execution. I
> > > > > > do not think we should support variable types during execution
> for
> > > the
> > > > > > reasons that you enumerated. If by runtime polymorphism you mean
> > > > changing
> > > > > > types during execution, then I very much agree with you that that
> > is
> > > a
> > > > > > no-no.
> > > > > >
> > > > > > During planning, though, we have options: should we allow users
> to
> > > > write
> > > > > > something like `my_operation(f32|f64) -> (f32|f64)`, on which the
> > > type
> > > > is
> > > > > > inferred after we know the function's input in the logical plan,
> or
> > > > > should
> > > > > > we not allow that and require users to register
> > > `my_operation_f32(f32)`
> > > > > and
> > > > > > `my_operation_f64(f64)` separately? The three findings that I
> > > mentioned
> > > > > > above refer to planned polymorphism: return type is resolved
> during
> > > > > > planning (and constant during execution).
> > > > > >
> > > > > > The biggest use-case IMO for polymorphism during planning is for
> > > > > functions
> > > > > > that yield structures/lists of values (a-la collect_list) whose
> > type
> > > > can
> > > > > > only be inferred after we know the functions' input type
> > (array(f32)
> > > vs
> > > > > > array(f64)), and whose implementation can be generalized via a
> > macro
> > > +
> > > > > > match.
> > > > > >
> > > > > > From a technical point of view, we currently have functions with
> > > > variable
> > > > > > types (all binary operators' return type depends on the lhs'
> type,
> > > sum,
> > > > > > max/min, etc.), and we have to handle the main planning
> challenges
> > > > > already.
> > > > > > In this context, the questions are something like:
> > > > > >
> > > > > > 1. should we continue to have them or should we move away from
> > them?
> > > > > > 2.1 If not, what should we do with them? E.g. declare sum_i32,
> > > sum_i64,
> > > > > > etc., that have a single return type?
> > > > > > 2.2 if yes, show we allow users to register these types of
> > functions,
> > > > or
> > > > > > should these only be allowed within DataFusion's code base?
> > > > > >
> > > > > > Best,
> > > > > > Jorge
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <
> alamb@influxdata.com>
> > > > > wrote:
> > > > > >
> > > > > > > In my opinion, I suggest we do not continue down the path of
> > > > (runtime)
> > > > > > > polymorphic functions unless a compelling use case for them can
> > be
> > > > > > > articulated.
> > > > > > >
> > > > > > > You have done a great job articulating some of the
> implementation
> > > > > > > challenges, but I personally struggle to describe when, as a
> user
> > > of
> > > > > > > DataFusion, I would want to write a (runtime) polymorphic
> > function.
> > > > > > >
> > > > > > > A function with runtime polymorphism I think would mean the UDF
> > > could
> > > > > > > handle the type changing *at runtime*: record batches could
> come
> > in
> > > > > with
> > > > > > > multiple different types during the same execution. I can't
> think
> > > of
> > > > > > > examples where this behavior would be desirable or necessary.
> > > > > > >
> > > > > > > The existing DataFusion codebase seems to assume (reasonably in
> > my
> > > > > > opinion)
> > > > > > > that the schema of each Logical / Physical plan node is known
> at
> > > > > planning
> > > > > > > time and it does not change at runtime.
> > > > > > >
> > > > > > > Most query optimizers (and compilers for that matter) take
> > > advantage
> > > > of
> > > > > > > plan (compile) time type information to make runtime more
> > > efficient.
> > > > > > Also,
> > > > > > > it seems like other database / runtime systems such as mysql[1]
> > and
> > > > > > > postgres[2] require the UDF creator to explicitly specify the
> > > return
> > > > > type
> > > > > > > as well. I think we should consider the simpler semantics of "1
> > > > return
> > > > > > type
> > > > > > > for each UDF" to make it easier on people writing UDFs as well
> as
> > > > > > > simplifying the implementation of DataFusion itself.
> > > > > > >
> > > > > > > Andrew
> > > > > > >
> > > > > > > [1]
> > > https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> > > > > > > [2] https://www.postgresql.org/docs/12/sql-createfunction.html
> > > > > > >
> > > > > > > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> > > > > > > jorgecarleitao@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Recently, I have been contributing to DataFusion, and I would
> > > like
> > > > to
> > > > > > > bring
> > > > > > > > to your attention a question that I faced while PRing to
> > > DataFusion
> > > > > > that
> > > > > > > > IMO needs some alignment :)
> > > > > > > >
> > > > > > > > DataFusion supports scalar UDFs: functions that expect a
> type,
> > > > > return a
> > > > > > > > type, and performs some operation on the data (a-la spark
> UDF).
> > > > > > However,
> > > > > > > > the execution engine is actually dynamically typed:
> > > > > > > >
> > > > > > > > * a scalar UDF receives an &[ArrayRef] that must be
> downcasted
> > > > > > > accordingly
> > > > > > > > * a scalar UDF must select the builder that matches its
> > > signature,
> > > > so
> > > > > > > that
> > > > > > > > its return type matches the ArrayRef that it returns.
> > > > > > > >
> > > > > > > > This suggests that we can treat functions as polymorphic: as
> > long
> > > > as
> > > > > > the
> > > > > > > > function handles the different types (e.g. via match), we are
> > > good.
> > > > > We
> > > > > > > > currently do not support multiple input types nor variable
> > return
> > > > > types
> > > > > > > in
> > > > > > > > their function signatures.
> > > > > > > >
> > > > > > > > Our current (non-udf) scalar and aggregate functions are
> > already
> > > > > > > > polymorphic on both their input and return type: sum(i32) ->
> > i64,
> > > > > > > sum(f64)
> > > > > > > > -> f64, "a + b". I have been working on PRs to support
> > > polymorphic
> > > > > > > support
> > > > > > > > to scalar UDFs (e.g. sqrt() can take float32 and float64)
> > [1,3],
> > > as
> > > > > > well
> > > > > > > as
> > > > > > > > polymorphic aggregate UDFs [2], so that we can extend our
> > > offering
> > > > to
> > > > > > > more
> > > > > > > > interesting functions such as "length(t) -> uint", "array(c1,
> > > c2)",
> > > > > > > > "collect_list(t) -> array(t)", etc.
> > > > > > > >
> > > > > > > > However, while working on [1,2,3], I reach some non-trivial
> > > > findings
> > > > > > > that I
> > > > > > > > would like to share:
> > > > > > > >
> > > > > > > > Finding 1: to support polymorphic functions, our logical and
> > > > physical
> > > > > > > > expressions (Expr and PhysicalExpr) need to be polymorphic
> > > as-well:
> > > > > > once
> > > > > > > a
> > > > > > > > function is polymorphic, any expression containing it is also
> > > > > > > polymorphic.
> > > > > > > >
> > > > > > > > Finding 2: when a polymorphic expression passes through our
> > type
> > > > > > coercer
> > > > > > > > optimizer (that tries to coerce types to match a function's
> > > > > signature),
> > > > > > > it
> > > > > > > > may be re-casted to a different type. If the return type
> > changes,
> > > > the
> > > > > > > > optimizer may need to re-cast operations dependent of the
> > > function
> > > > > call
> > > > > > > > (e.g. a projection followed by an aggregation may need a
> recast
> > > on
> > > > > the
> > > > > > > > projection and on the aggregation).
> > > > > > > >
> > > > > > > > Finding 3: when an expression passes through our type coercer
> > > > > optimizer
> > > > > > > and
> > > > > > > > is re-casted, its name changes (typically from "expr" to
> > > "CAST(expr
> > > > > as
> > > > > > > > X)"). This implies that a column referenced as #expr down the
> > > plan
> > > > > may
> > > > > > > not
> > > > > > > > exist depending on the input type of the initial
> > projection/scan.
> > > > > > > >
> > > > > > > > Finding 1 and 2 IMO are a direct consequence of polymorphism
> > and
> > > > the
> > > > > > only
> > > > > > > > way to not handle them is by not supporting polymorphism
> (e.g.
> > > the
> > > > > user
> > > > > > > > registers sqrt_f32 and sqrt_f64, etc).
> > > > > > > >
> > > > > > > > Finding 3 can be addressed in at least three ways:
> > > > > > > >
> > > > > > > > A) make the optimizer rewrite the expression as "CAST(expr as
> > X)
> > > AS
> > > > > > > expr",
> > > > > > > > so that it retains its original name. This hides the actual
> > > > > > expression's
> > > > > > > > calculation, but preserves its original name.
> > > > > > > > B) accept that expressions can always change its name, which
> > > means
> > > > > that
> > > > > > > the
> > > > > > > > user should be mindful when writing `col("SELECT sqrt(x) FROM
> > > t"`,
> > > > as
> > > > > > the
> > > > > > > > column name may end up being called `"sqrt(CAST(x as X))"`.
> > > > > > > > C) Do not support polymorphic functions
> > > > > > > >
> > > > > > > > Note that we currently already experience effects 1-3, it is
> > just
> > > > > that
> > > > > > we
> > > > > > > > use so few polymorphic functions that these seldomly present
> > > > > > themselves.
> > > > > > > It
> > > > > > > > was while working on [1,2,3] that I start painting the bigger
> > > > > picture.
> > > > > > > >
> > > > > > > > Some questions:
> > > > > > > > 1. should continue down the path of polymorphic functions?
> > > > > > > > 2. if yes, how do handle finding 3?
> > > > > > > >
> > > > > > > > Looking at the current code base, I am confident that we can
> > > > address
> > > > > > the
> > > > > > > > technical issues to support polymorphic functions. However,
> it
> > > > would
> > > > > be
> > > > > > > > interesting to have your thoughts on this.
> > > > > > > >
> > > > > > > > [1] https://github.com/apache/arrow/pull/7967
> > > > > > > > [2] https://github.com/apache/arrow/pull/7971
> > > > > > > > [3] https://github.com/apache/arrow/pull/7974
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Polymorphism in DataFusion

Posted by Andrew Lamb <al...@influxdata.com>.
Thanks for writing that up Jorge -- I read the documents and left some
comments, but in general I would say this matches my personal understanding
of the design of DataFusion and where I think it should head.

On Fri, Aug 21, 2020 at 4:41 PM Jorge Cardoso Leitão <
jorgecarleitao@gmail.com> wrote:

> Hi everyone,
>
> Got it. I agree that we should aim for a proposal.
>
> As exercise, I wrote some personal notes
> <
> https://docs.google.com/document/d/1Asnz29uUS1t60QNbNBU9SiME274rja-hcDvX_RDraFU/edit?usp=sharing
> >
> about DataFusion's notions and invariants, as they form the basis for any
> proposal.
> I would be interested in knowing how far these are from how you here see
> DataFusion.
>
> Have a great weekend everyone,
> Jorge
>
>
> On Wed, Aug 19, 2020 at 11:48 PM Andrew Lamb <al...@influxdata.com> wrote:
>
> > I think B) is closer to what I was thinking.
> >
> > We may be using the term statically and dynamically typed a little
> > differently -- I am sorry for the confusion. I have somewhat lost track
> of
> > exactly what we are proposing and for that I apologize.
> >
> > I propose a next step of sketching out a proposed API for DataFusion UDFs
> > to implement, and circulate that around for commentary. I don't think I
> > will have time to do this any time soon (unless it becomes directly
> > important for the project I am working on)
> >
> > Thanks for taking the initiative on this,
> > Andrew
> >
> > On Wed, Aug 19, 2020 at 2:29 PM Jorge Cardoso Leitão <
> > jorgecarleitao@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Thank you for this enlightening discussion, Andrew!
> > >
> > > So, just to make sure I understood, are you proposing A), B) or
> something
> > > else?
> > >
> > > A) we should not accept / declare polymorphic operations: all types
> > should
> > > be known based on the operation name (e.g. sum_f32, plus_f32, etc.)
> > > B) we should continue to have "sum", "count", "+", etc. as polymorphic
> > > operations, but we should not allow registering udfs as polymorphic,
> both
> > > internally nor externally. I.e. all polymorphic operations are
> > hard-coded.
> > >
> > > Let's assume A) first. I relate to the sentiment that Rust is
> statically
> > > typed. However, as I see it, DataFusion is not: our main traits are
> > > arrow::array::Array and RecordBatch, which are both dynamically typed
> > (e.g.
> > > Array::{data_type,as_any} and RecordBatch). Since all ops are also
> > > dynamically typed (they receive Arc<Array> or RecordBatch) and use
> > runtime
> > > reflection via `match array.data_type()` at the physical level to
> > downcast
> > > Array to its respective native type, wouldn't A) lead to a major change
> > in
> > > DataFusion?
> > >
> > > Let's now assume B), and let me try to expand on your 3 points:
> > >
> > > 1. Once an operation in our plan is polymorphic, the whole plan is
> > > polymorphic and the final schema can only be inferred from the initial
> > > projection's schema / scan. A simple example of this using only
> functions
> > > that we currently support is:
> > >
> > > df = scan([c1 float32, c2 float64, c3 float64])
> > > df = df.select(c1 * c2 as sum12, c1 * c3 as sum13)
> > > df = df.aggregate(MIN(sum12), MIN(sum13))
> > >
> > > The plan for this is that the first product returns a float32 (lower
> > > precision of both), and the second returns a float64. MIN's return type
> > now
> > > depends on the first select's return type, which is in a previous node.
> > So,
> > > even if only our internal ops are polymorphic, this is sufficient to
> > > require our optimizers to handle dynamically typed expressions and
> > schemas
> > > whose type is only known during planning (after the scan's schema is
> > > known).
> > >
> > > 2. I relate to that sentiment. About the same time Andy proposed the
> (now
> > > UDFs) dynamically typed UDFs, I made a 1k+ proposal for statically
> typed
> > > UDFs. In retrospect, IMO dynamically typed UDFs are a far superior
> > offering
> > > as they offer an enormous flexibility and at no additional cost: we
> could
> > > offer users an interface with fixed types only (e.g. via a macro), but,
> > in
> > > the end, all our memory structures are dynamic typed anyway (Array,
> > > RecordBatch, etc.), and thus whether the user or us, a downcast will
> > still
> > > need to take place at runtime.
> > >
> > > 3. Users are still able to specify the type they want in query
> languages
> > > that support polymorphic functions such as postgres, both at the query
> > > level and on polymorphic UDFs. Most dialects support cast operations
> that
> > > allow users to narrow types (::float in postgres, CAST(x AS float64) in
> > > spark), that are only physically executed if needed.
> > >
> > > So, to summarize my thoughts so far:
> > >
> > > i) DataFusion is dynamically typed by design
> > > ii) We already support dynamically typed scalar UDFs
> > > iii) we currently have polymorphic functions (internally) and already
> > have
> > > to deal with them on our logical and physical plans.
> > > iv) there is no practical limitation of supporting polymorphic UDFs, it
> > is
> > > a matter of whether the benefits outweigh the development and
> maintenance
> > > costs.
> > >
> > > I am inclined to say that given i-iii), we should support polymorphic
> > > (scalar and agg) UDFs, which would put us on the same level of UDF
> > support
> > > as postgres. However, we should offer a very easy interface for users
> to
> > > register a non-polymorphic UDF, e.g.
> > >
> > > ctx.register(name, udf(callable, arg_types, return_type)?)?
> > >
> > > where udf returns the specialization of a generic UDF that expects N
> > types
> > > and returns return_type.
> > >
> > > Best,
> > > Jorge
> > >
> > >
> > > On Tue, Aug 18, 2020 at 6:52 PM Andrew Lamb <al...@influxdata.com>
> > wrote:
> > >
> > > > It is my personal opinion that actual UDF functions  registered with
> > data
> > > > fusion should take a known set of input types and single return type
> > > (e.g.
> > > > sum_i32 --> i32). I think this would:
> > > > 1. Simplify the implementation of both the DataFusion optimizer and
> the
> > > > UDFs
> > > > 2. Make it easier for UDF writers as the UDF code would look more
> like
> > > > Rust: the types would be clear from the function signatures, as is
> the
> > > case
> > > > in Rust in general
> > > > 3. Give the user of SQL / DataFrames the ability to specifically
> > specify
> > > > what types they want
> > > >
> > > > If we wanted the ability for the user to specify `sum(i)` and let the
> > > type
> > > > coercion pass pick `sum_i32` or `sum_i64` depending on the input
> > types, I
> > > > recommend doing that at a different level than the UDF (perhaps via
> > > > `register_alias("sum", "sum_i32)` or something), again for both
> clarity
> > > of
> > > > DataFusion implementation as well as UDF specification.
> > > >
> > > > Andrew
> > > >
> > > > On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão <
> > > > jorgecarleitao@gmail.com> wrote:
> > > >
> > > > > Thanks Andrew,
> > > > >
> > > > > I am not sure I articulated this well enough, though, as I did not
> > > > specify
> > > > > the type of polymorphism that I was thinking about. xD
> > > > >
> > > > > My question was/is about whether we should accept functions whose
> > > return
> > > > > type is known during planning, and constant during execution, or
> > > whether
> > > > > their return types must be constant both during planning and
> > > execution. I
> > > > > do not think we should support variable types during execution for
> > the
> > > > > reasons that you enumerated. If by runtime polymorphism you mean
> > > changing
> > > > > types during execution, then I very much agree with you that that
> is
> > a
> > > > > no-no.
> > > > >
> > > > > During planning, though, we have options: should we allow users to
> > > write
> > > > > something like `my_operation(f32|f64) -> (f32|f64)`, on which the
> > type
> > > is
> > > > > inferred after we know the function's input in the logical plan, or
> > > > should
> > > > > we not allow that and require users to register
> > `my_operation_f32(f32)`
> > > > and
> > > > > `my_operation_f64(f64)` separately? The three findings that I
> > mentioned
> > > > > above refer to planned polymorphism: return type is resolved during
> > > > > planning (and constant during execution).
> > > > >
> > > > > The biggest use-case IMO for polymorphism during planning is for
> > > > functions
> > > > > that yield structures/lists of values (a-la collect_list) whose
> type
> > > can
> > > > > only be inferred after we know the functions' input type
> (array(f32)
> > vs
> > > > > array(f64)), and whose implementation can be generalized via a
> macro
> > +
> > > > > match.
> > > > >
> > > > > From a technical point of view, we currently have functions with
> > > variable
> > > > > types (all binary operators' return type depends on the lhs' type,
> > sum,
> > > > > max/min, etc.), and we have to handle the main planning challenges
> > > > already.
> > > > > In this context, the questions are something like:
> > > > >
> > > > > 1. should we continue to have them or should we move away from
> them?
> > > > > 2.1 If not, what should we do with them? E.g. declare sum_i32,
> > sum_i64,
> > > > > etc., that have a single return type?
> > > > > 2.2 if yes, show we allow users to register these types of
> functions,
> > > or
> > > > > should these only be allowed within DataFusion's code base?
> > > > >
> > > > > Best,
> > > > > Jorge
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com>
> > > > wrote:
> > > > >
> > > > > > In my opinion, I suggest we do not continue down the path of
> > > (runtime)
> > > > > > polymorphic functions unless a compelling use case for them can
> be
> > > > > > articulated.
> > > > > >
> > > > > > You have done a great job articulating some of the implementation
> > > > > > challenges, but I personally struggle to describe when, as a user
> > of
> > > > > > DataFusion, I would want to write a (runtime) polymorphic
> function.
> > > > > >
> > > > > > A function with runtime polymorphism I think would mean the UDF
> > could
> > > > > > handle the type changing *at runtime*: record batches could come
> in
> > > > with
> > > > > > multiple different types during the same execution. I can't think
> > of
> > > > > > examples where this behavior would be desirable or necessary.
> > > > > >
> > > > > > The existing DataFusion codebase seems to assume (reasonably in
> my
> > > > > opinion)
> > > > > > that the schema of each Logical / Physical plan node is known at
> > > > planning
> > > > > > time and it does not change at runtime.
> > > > > >
> > > > > > Most query optimizers (and compilers for that matter) take
> > advantage
> > > of
> > > > > > plan (compile) time type information to make runtime more
> > efficient.
> > > > > Also,
> > > > > > it seems like other database / runtime systems such as mysql[1]
> and
> > > > > > postgres[2] require the UDF creator to explicitly specify the
> > return
> > > > type
> > > > > > as well. I think we should consider the simpler semantics of "1
> > > return
> > > > > type
> > > > > > for each UDF" to make it easier on people writing UDFs as well as
> > > > > > simplifying the implementation of DataFusion itself.
> > > > > >
> > > > > > Andrew
> > > > > >
> > > > > > [1]
> > https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> > > > > > [2] https://www.postgresql.org/docs/12/sql-createfunction.html
> > > > > >
> > > > > > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> > > > > > jorgecarleitao@gmail.com> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Recently, I have been contributing to DataFusion, and I would
> > like
> > > to
> > > > > > bring
> > > > > > > to your attention a question that I faced while PRing to
> > DataFusion
> > > > > that
> > > > > > > IMO needs some alignment :)
> > > > > > >
> > > > > > > DataFusion supports scalar UDFs: functions that expect a type,
> > > > return a
> > > > > > > type, and performs some operation on the data (a-la spark UDF).
> > > > > However,
> > > > > > > the execution engine is actually dynamically typed:
> > > > > > >
> > > > > > > * a scalar UDF receives an &[ArrayRef] that must be downcasted
> > > > > > accordingly
> > > > > > > * a scalar UDF must select the builder that matches its
> > signature,
> > > so
> > > > > > that
> > > > > > > its return type matches the ArrayRef that it returns.
> > > > > > >
> > > > > > > This suggests that we can treat functions as polymorphic: as
> long
> > > as
> > > > > the
> > > > > > > function handles the different types (e.g. via match), we are
> > good.
> > > > We
> > > > > > > currently do not support multiple input types nor variable
> return
> > > > types
> > > > > > in
> > > > > > > their function signatures.
> > > > > > >
> > > > > > > Our current (non-udf) scalar and aggregate functions are
> already
> > > > > > > polymorphic on both their input and return type: sum(i32) ->
> i64,
> > > > > > sum(f64)
> > > > > > > -> f64, "a + b". I have been working on PRs to support
> > polymorphic
> > > > > > support
> > > > > > > to scalar UDFs (e.g. sqrt() can take float32 and float64)
> [1,3],
> > as
> > > > > well
> > > > > > as
> > > > > > > polymorphic aggregate UDFs [2], so that we can extend our
> > offering
> > > to
> > > > > > more
> > > > > > > interesting functions such as "length(t) -> uint", "array(c1,
> > c2)",
> > > > > > > "collect_list(t) -> array(t)", etc.
> > > > > > >
> > > > > > > However, while working on [1,2,3], I reach some non-trivial
> > > findings
> > > > > > that I
> > > > > > > would like to share:
> > > > > > >
> > > > > > > Finding 1: to support polymorphic functions, our logical and
> > > physical
> > > > > > > expressions (Expr and PhysicalExpr) need to be polymorphic
> > as-well:
> > > > > once
> > > > > > a
> > > > > > > function is polymorphic, any expression containing it is also
> > > > > > polymorphic.
> > > > > > >
> > > > > > > Finding 2: when a polymorphic expression passes through our
> type
> > > > > coercer
> > > > > > > optimizer (that tries to coerce types to match a function's
> > > > signature),
> > > > > > it
> > > > > > > may be re-casted to a different type. If the return type
> changes,
> > > the
> > > > > > > optimizer may need to re-cast operations dependent of the
> > function
> > > > call
> > > > > > > (e.g. a projection followed by an aggregation may need a recast
> > on
> > > > the
> > > > > > > projection and on the aggregation).
> > > > > > >
> > > > > > > Finding 3: when an expression passes through our type coercer
> > > > optimizer
> > > > > > and
> > > > > > > is re-casted, its name changes (typically from "expr" to
> > "CAST(expr
> > > > as
> > > > > > > X)"). This implies that a column referenced as #expr down the
> > plan
> > > > may
> > > > > > not
> > > > > > > exist depending on the input type of the initial
> projection/scan.
> > > > > > >
> > > > > > > Finding 1 and 2 IMO are a direct consequence of polymorphism
> and
> > > the
> > > > > only
> > > > > > > way to not handle them is by not supporting polymorphism (e.g.
> > the
> > > > user
> > > > > > > registers sqrt_f32 and sqrt_f64, etc).
> > > > > > >
> > > > > > > Finding 3 can be addressed in at least three ways:
> > > > > > >
> > > > > > > A) make the optimizer rewrite the expression as "CAST(expr as
> X)
> > AS
> > > > > > expr",
> > > > > > > so that it retains its original name. This hides the actual
> > > > > expression's
> > > > > > > calculation, but preserves its original name.
> > > > > > > B) accept that expressions can always change its name, which
> > means
> > > > that
> > > > > > the
> > > > > > > user should be mindful when writing `col("SELECT sqrt(x) FROM
> > t"`,
> > > as
> > > > > the
> > > > > > > column name may end up being called `"sqrt(CAST(x as X))"`.
> > > > > > > C) Do not support polymorphic functions
> > > > > > >
> > > > > > > Note that we currently already experience effects 1-3, it is
> just
> > > > that
> > > > > we
> > > > > > > use so few polymorphic functions that these seldomly present
> > > > > themselves.
> > > > > > It
> > > > > > > was while working on [1,2,3] that I start painting the bigger
> > > > picture.
> > > > > > >
> > > > > > > Some questions:
> > > > > > > 1. should continue down the path of polymorphic functions?
> > > > > > > 2. if yes, how do handle finding 3?
> > > > > > >
> > > > > > > Looking at the current code base, I am confident that we can
> > > address
> > > > > the
> > > > > > > technical issues to support polymorphic functions. However, it
> > > would
> > > > be
> > > > > > > interesting to have your thoughts on this.
> > > > > > >
> > > > > > > [1] https://github.com/apache/arrow/pull/7967
> > > > > > > [2] https://github.com/apache/arrow/pull/7971
> > > > > > > [3] https://github.com/apache/arrow/pull/7974
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Polymorphism in DataFusion

Posted by Jorge Cardoso Leitão <jo...@gmail.com>.
Hi everyone,

Got it. I agree that we should aim for a proposal.

As exercise, I wrote some personal notes
<https://docs.google.com/document/d/1Asnz29uUS1t60QNbNBU9SiME274rja-hcDvX_RDraFU/edit?usp=sharing>
about DataFusion's notions and invariants, as they form the basis for any
proposal.
I would be interested in knowing how far these are from how you here see
DataFusion.

Have a great weekend everyone,
Jorge


On Wed, Aug 19, 2020 at 11:48 PM Andrew Lamb <al...@influxdata.com> wrote:

> I think B) is closer to what I was thinking.
>
> We may be using the term statically and dynamically typed a little
> differently -- I am sorry for the confusion. I have somewhat lost track of
> exactly what we are proposing and for that I apologize.
>
> I propose a next step of sketching out a proposed API for DataFusion UDFs
> to implement, and circulate that around for commentary. I don't think I
> will have time to do this any time soon (unless it becomes directly
> important for the project I am working on)
>
> Thanks for taking the initiative on this,
> Andrew
>
> On Wed, Aug 19, 2020 at 2:29 PM Jorge Cardoso Leitão <
> jorgecarleitao@gmail.com> wrote:
>
> > Hi,
> >
> > Thank you for this enlightening discussion, Andrew!
> >
> > So, just to make sure I understood, are you proposing A), B) or something
> > else?
> >
> > A) we should not accept / declare polymorphic operations: all types
> should
> > be known based on the operation name (e.g. sum_f32, plus_f32, etc.)
> > B) we should continue to have "sum", "count", "+", etc. as polymorphic
> > operations, but we should not allow registering udfs as polymorphic, both
> > internally nor externally. I.e. all polymorphic operations are
> hard-coded.
> >
> > Let's assume A) first. I relate to the sentiment that Rust is statically
> > typed. However, as I see it, DataFusion is not: our main traits are
> > arrow::array::Array and RecordBatch, which are both dynamically typed
> (e.g.
> > Array::{data_type,as_any} and RecordBatch). Since all ops are also
> > dynamically typed (they receive Arc<Array> or RecordBatch) and use
> runtime
> > reflection via `match array.data_type()` at the physical level to
> downcast
> > Array to its respective native type, wouldn't A) lead to a major change
> in
> > DataFusion?
> >
> > Let's now assume B), and let me try to expand on your 3 points:
> >
> > 1. Once an operation in our plan is polymorphic, the whole plan is
> > polymorphic and the final schema can only be inferred from the initial
> > projection's schema / scan. A simple example of this using only functions
> > that we currently support is:
> >
> > df = scan([c1 float32, c2 float64, c3 float64])
> > df = df.select(c1 * c2 as sum12, c1 * c3 as sum13)
> > df = df.aggregate(MIN(sum12), MIN(sum13))
> >
> > The plan for this is that the first product returns a float32 (lower
> > precision of both), and the second returns a float64. MIN's return type
> now
> > depends on the first select's return type, which is in a previous node.
> So,
> > even if only our internal ops are polymorphic, this is sufficient to
> > require our optimizers to handle dynamically typed expressions and
> schemas
> > whose type is only known during planning (after the scan's schema is
> > known).
> >
> > 2. I relate to that sentiment. About the same time Andy proposed the (now
> > UDFs) dynamically typed UDFs, I made a 1k+ proposal for statically typed
> > UDFs. In retrospect, IMO dynamically typed UDFs are a far superior
> offering
> > as they offer an enormous flexibility and at no additional cost: we could
> > offer users an interface with fixed types only (e.g. via a macro), but,
> in
> > the end, all our memory structures are dynamic typed anyway (Array,
> > RecordBatch, etc.), and thus whether the user or us, a downcast will
> still
> > need to take place at runtime.
> >
> > 3. Users are still able to specify the type they want in query languages
> > that support polymorphic functions such as postgres, both at the query
> > level and on polymorphic UDFs. Most dialects support cast operations that
> > allow users to narrow types (::float in postgres, CAST(x AS float64) in
> > spark), that are only physically executed if needed.
> >
> > So, to summarize my thoughts so far:
> >
> > i) DataFusion is dynamically typed by design
> > ii) We already support dynamically typed scalar UDFs
> > iii) we currently have polymorphic functions (internally) and already
> have
> > to deal with them on our logical and physical plans.
> > iv) there is no practical limitation of supporting polymorphic UDFs, it
> is
> > a matter of whether the benefits outweigh the development and maintenance
> > costs.
> >
> > I am inclined to say that given i-iii), we should support polymorphic
> > (scalar and agg) UDFs, which would put us on the same level of UDF
> support
> > as postgres. However, we should offer a very easy interface for users to
> > register a non-polymorphic UDF, e.g.
> >
> > ctx.register(name, udf(callable, arg_types, return_type)?)?
> >
> > where udf returns the specialization of a generic UDF that expects N
> types
> > and returns return_type.
> >
> > Best,
> > Jorge
> >
> >
> > On Tue, Aug 18, 2020 at 6:52 PM Andrew Lamb <al...@influxdata.com>
> wrote:
> >
> > > It is my personal opinion that actual UDF functions  registered with
> data
> > > fusion should take a known set of input types and single return type
> > (e.g.
> > > sum_i32 --> i32). I think this would:
> > > 1. Simplify the implementation of both the DataFusion optimizer and the
> > > UDFs
> > > 2. Make it easier for UDF writers as the UDF code would look more like
> > > Rust: the types would be clear from the function signatures, as is the
> > case
> > > in Rust in general
> > > 3. Give the user of SQL / DataFrames the ability to specifically
> specify
> > > what types they want
> > >
> > > If we wanted the ability for the user to specify `sum(i)` and let the
> > type
> > > coercion pass pick `sum_i32` or `sum_i64` depending on the input
> types, I
> > > recommend doing that at a different level than the UDF (perhaps via
> > > `register_alias("sum", "sum_i32)` or something), again for both clarity
> > of
> > > DataFusion implementation as well as UDF specification.
> > >
> > > Andrew
> > >
> > > On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão <
> > > jorgecarleitao@gmail.com> wrote:
> > >
> > > > Thanks Andrew,
> > > >
> > > > I am not sure I articulated this well enough, though, as I did not
> > > specify
> > > > the type of polymorphism that I was thinking about. xD
> > > >
> > > > My question was/is about whether we should accept functions whose
> > return
> > > > type is known during planning, and constant during execution, or
> > whether
> > > > their return types must be constant both during planning and
> > execution. I
> > > > do not think we should support variable types during execution for
> the
> > > > reasons that you enumerated. If by runtime polymorphism you mean
> > changing
> > > > types during execution, then I very much agree with you that that is
> a
> > > > no-no.
> > > >
> > > > During planning, though, we have options: should we allow users to
> > write
> > > > something like `my_operation(f32|f64) -> (f32|f64)`, on which the
> type
> > is
> > > > inferred after we know the function's input in the logical plan, or
> > > should
> > > > we not allow that and require users to register
> `my_operation_f32(f32)`
> > > and
> > > > `my_operation_f64(f64)` separately? The three findings that I
> mentioned
> > > > above refer to planned polymorphism: return type is resolved during
> > > > planning (and constant during execution).
> > > >
> > > > The biggest use-case IMO for polymorphism during planning is for
> > > functions
> > > > that yield structures/lists of values (a-la collect_list) whose type
> > can
> > > > only be inferred after we know the functions' input type (array(f32)
> vs
> > > > array(f64)), and whose implementation can be generalized via a macro
> +
> > > > match.
> > > >
> > > > From a technical point of view, we currently have functions with
> > variable
> > > > types (all binary operators' return type depends on the lhs' type,
> sum,
> > > > max/min, etc.), and we have to handle the main planning challenges
> > > already.
> > > > In this context, the questions are something like:
> > > >
> > > > 1. should we continue to have them or should we move away from them?
> > > > 2.1 If not, what should we do with them? E.g. declare sum_i32,
> sum_i64,
> > > > etc., that have a single return type?
> > > > 2.2 if yes, show we allow users to register these types of functions,
> > or
> > > > should these only be allowed within DataFusion's code base?
> > > >
> > > > Best,
> > > > Jorge
> > > >
> > > >
> > > >
> > > > On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com>
> > > wrote:
> > > >
> > > > > In my opinion, I suggest we do not continue down the path of
> > (runtime)
> > > > > polymorphic functions unless a compelling use case for them can be
> > > > > articulated.
> > > > >
> > > > > You have done a great job articulating some of the implementation
> > > > > challenges, but I personally struggle to describe when, as a user
> of
> > > > > DataFusion, I would want to write a (runtime) polymorphic function.
> > > > >
> > > > > A function with runtime polymorphism I think would mean the UDF
> could
> > > > > handle the type changing *at runtime*: record batches could come in
> > > with
> > > > > multiple different types during the same execution. I can't think
> of
> > > > > examples where this behavior would be desirable or necessary.
> > > > >
> > > > > The existing DataFusion codebase seems to assume (reasonably in my
> > > > opinion)
> > > > > that the schema of each Logical / Physical plan node is known at
> > > planning
> > > > > time and it does not change at runtime.
> > > > >
> > > > > Most query optimizers (and compilers for that matter) take
> advantage
> > of
> > > > > plan (compile) time type information to make runtime more
> efficient.
> > > > Also,
> > > > > it seems like other database / runtime systems such as mysql[1] and
> > > > > postgres[2] require the UDF creator to explicitly specify the
> return
> > > type
> > > > > as well. I think we should consider the simpler semantics of "1
> > return
> > > > type
> > > > > for each UDF" to make it easier on people writing UDFs as well as
> > > > > simplifying the implementation of DataFusion itself.
> > > > >
> > > > > Andrew
> > > > >
> > > > > [1]
> https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> > > > > [2] https://www.postgresql.org/docs/12/sql-createfunction.html
> > > > >
> > > > > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> > > > > jorgecarleitao@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Recently, I have been contributing to DataFusion, and I would
> like
> > to
> > > > > bring
> > > > > > to your attention a question that I faced while PRing to
> DataFusion
> > > > that
> > > > > > IMO needs some alignment :)
> > > > > >
> > > > > > DataFusion supports scalar UDFs: functions that expect a type,
> > > return a
> > > > > > type, and performs some operation on the data (a-la spark UDF).
> > > > However,
> > > > > > the execution engine is actually dynamically typed:
> > > > > >
> > > > > > * a scalar UDF receives an &[ArrayRef] that must be downcasted
> > > > > accordingly
> > > > > > * a scalar UDF must select the builder that matches its
> signature,
> > so
> > > > > that
> > > > > > its return type matches the ArrayRef that it returns.
> > > > > >
> > > > > > This suggests that we can treat functions as polymorphic: as long
> > as
> > > > the
> > > > > > function handles the different types (e.g. via match), we are
> good.
> > > We
> > > > > > currently do not support multiple input types nor variable return
> > > types
> > > > > in
> > > > > > their function signatures.
> > > > > >
> > > > > > Our current (non-udf) scalar and aggregate functions are already
> > > > > > polymorphic on both their input and return type: sum(i32) -> i64,
> > > > > sum(f64)
> > > > > > -> f64, "a + b". I have been working on PRs to support
> polymorphic
> > > > > support
> > > > > > to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3],
> as
> > > > well
> > > > > as
> > > > > > polymorphic aggregate UDFs [2], so that we can extend our
> offering
> > to
> > > > > more
> > > > > > interesting functions such as "length(t) -> uint", "array(c1,
> c2)",
> > > > > > "collect_list(t) -> array(t)", etc.
> > > > > >
> > > > > > However, while working on [1,2,3], I reach some non-trivial
> > findings
> > > > > that I
> > > > > > would like to share:
> > > > > >
> > > > > > Finding 1: to support polymorphic functions, our logical and
> > physical
> > > > > > expressions (Expr and PhysicalExpr) need to be polymorphic
> as-well:
> > > > once
> > > > > a
> > > > > > function is polymorphic, any expression containing it is also
> > > > > polymorphic.
> > > > > >
> > > > > > Finding 2: when a polymorphic expression passes through our type
> > > > coercer
> > > > > > optimizer (that tries to coerce types to match a function's
> > > signature),
> > > > > it
> > > > > > may be re-casted to a different type. If the return type changes,
> > the
> > > > > > optimizer may need to re-cast operations dependent of the
> function
> > > call
> > > > > > (e.g. a projection followed by an aggregation may need a recast
> on
> > > the
> > > > > > projection and on the aggregation).
> > > > > >
> > > > > > Finding 3: when an expression passes through our type coercer
> > > optimizer
> > > > > and
> > > > > > is re-casted, its name changes (typically from "expr" to
> "CAST(expr
> > > as
> > > > > > X)"). This implies that a column referenced as #expr down the
> plan
> > > may
> > > > > not
> > > > > > exist depending on the input type of the initial projection/scan.
> > > > > >
> > > > > > Finding 1 and 2 IMO are a direct consequence of polymorphism and
> > the
> > > > only
> > > > > > way to not handle them is by not supporting polymorphism (e.g.
> the
> > > user
> > > > > > registers sqrt_f32 and sqrt_f64, etc).
> > > > > >
> > > > > > Finding 3 can be addressed in at least three ways:
> > > > > >
> > > > > > A) make the optimizer rewrite the expression as "CAST(expr as X)
> AS
> > > > > expr",
> > > > > > so that it retains its original name. This hides the actual
> > > > expression's
> > > > > > calculation, but preserves its original name.
> > > > > > B) accept that expressions can always change its name, which
> means
> > > that
> > > > > the
> > > > > > user should be mindful when writing `col("SELECT sqrt(x) FROM
> t"`,
> > as
> > > > the
> > > > > > column name may end up being called `"sqrt(CAST(x as X))"`.
> > > > > > C) Do not support polymorphic functions
> > > > > >
> > > > > > Note that we currently already experience effects 1-3, it is just
> > > that
> > > > we
> > > > > > use so few polymorphic functions that these seldomly present
> > > > themselves.
> > > > > It
> > > > > > was while working on [1,2,3] that I start painting the bigger
> > > picture.
> > > > > >
> > > > > > Some questions:
> > > > > > 1. should continue down the path of polymorphic functions?
> > > > > > 2. if yes, how do handle finding 3?
> > > > > >
> > > > > > Looking at the current code base, I am confident that we can
> > address
> > > > the
> > > > > > technical issues to support polymorphic functions. However, it
> > would
> > > be
> > > > > > interesting to have your thoughts on this.
> > > > > >
> > > > > > [1] https://github.com/apache/arrow/pull/7967
> > > > > > [2] https://github.com/apache/arrow/pull/7971
> > > > > > [3] https://github.com/apache/arrow/pull/7974
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Polymorphism in DataFusion

Posted by Andrew Lamb <al...@influxdata.com>.
I think B) is closer to what I was thinking.

We may be using the term statically and dynamically typed a little
differently -- I am sorry for the confusion. I have somewhat lost track of
exactly what we are proposing and for that I apologize.

I propose a next step of sketching out a proposed API for DataFusion UDFs
to implement, and circulate that around for commentary. I don't think I
will have time to do this any time soon (unless it becomes directly
important for the project I am working on)

Thanks for taking the initiative on this,
Andrew

On Wed, Aug 19, 2020 at 2:29 PM Jorge Cardoso Leitão <
jorgecarleitao@gmail.com> wrote:

> Hi,
>
> Thank you for this enlightening discussion, Andrew!
>
> So, just to make sure I understood, are you proposing A), B) or something
> else?
>
> A) we should not accept / declare polymorphic operations: all types should
> be known based on the operation name (e.g. sum_f32, plus_f32, etc.)
> B) we should continue to have "sum", "count", "+", etc. as polymorphic
> operations, but we should not allow registering udfs as polymorphic, both
> internally nor externally. I.e. all polymorphic operations are hard-coded.
>
> Let's assume A) first. I relate to the sentiment that Rust is statically
> typed. However, as I see it, DataFusion is not: our main traits are
> arrow::array::Array and RecordBatch, which are both dynamically typed (e.g.
> Array::{data_type,as_any} and RecordBatch). Since all ops are also
> dynamically typed (they receive Arc<Array> or RecordBatch) and use runtime
> reflection via `match array.data_type()` at the physical level to downcast
> Array to its respective native type, wouldn't A) lead to a major change in
> DataFusion?
>
> Let's now assume B), and let me try to expand on your 3 points:
>
> 1. Once an operation in our plan is polymorphic, the whole plan is
> polymorphic and the final schema can only be inferred from the initial
> projection's schema / scan. A simple example of this using only functions
> that we currently support is:
>
> df = scan([c1 float32, c2 float64, c3 float64])
> df = df.select(c1 * c2 as sum12, c1 * c3 as sum13)
> df = df.aggregate(MIN(sum12), MIN(sum13))
>
> The plan for this is that the first product returns a float32 (lower
> precision of both), and the second returns a float64. MIN's return type now
> depends on the first select's return type, which is in a previous node. So,
> even if only our internal ops are polymorphic, this is sufficient to
> require our optimizers to handle dynamically typed expressions and schemas
> whose type is only known during planning (after the scan's schema is
> known).
>
> 2. I relate to that sentiment. About the same time Andy proposed the (now
> UDFs) dynamically typed UDFs, I made a 1k+ proposal for statically typed
> UDFs. In retrospect, IMO dynamically typed UDFs are a far superior offering
> as they offer an enormous flexibility and at no additional cost: we could
> offer users an interface with fixed types only (e.g. via a macro), but, in
> the end, all our memory structures are dynamic typed anyway (Array,
> RecordBatch, etc.), and thus whether the user or us, a downcast will still
> need to take place at runtime.
>
> 3. Users are still able to specify the type they want in query languages
> that support polymorphic functions such as postgres, both at the query
> level and on polymorphic UDFs. Most dialects support cast operations that
> allow users to narrow types (::float in postgres, CAST(x AS float64) in
> spark), that are only physically executed if needed.
>
> So, to summarize my thoughts so far:
>
> i) DataFusion is dynamically typed by design
> ii) We already support dynamically typed scalar UDFs
> iii) we currently have polymorphic functions (internally) and already have
> to deal with them on our logical and physical plans.
> iv) there is no practical limitation of supporting polymorphic UDFs, it is
> a matter of whether the benefits outweigh the development and maintenance
> costs.
>
> I am inclined to say that given i-iii), we should support polymorphic
> (scalar and agg) UDFs, which would put us on the same level of UDF support
> as postgres. However, we should offer a very easy interface for users to
> register a non-polymorphic UDF, e.g.
>
> ctx.register(name, udf(callable, arg_types, return_type)?)?
>
> where udf returns the specialization of a generic UDF that expects N types
> and returns return_type.
>
> Best,
> Jorge
>
>
> On Tue, Aug 18, 2020 at 6:52 PM Andrew Lamb <al...@influxdata.com> wrote:
>
> > It is my personal opinion that actual UDF functions  registered with data
> > fusion should take a known set of input types and single return type
> (e.g.
> > sum_i32 --> i32). I think this would:
> > 1. Simplify the implementation of both the DataFusion optimizer and the
> > UDFs
> > 2. Make it easier for UDF writers as the UDF code would look more like
> > Rust: the types would be clear from the function signatures, as is the
> case
> > in Rust in general
> > 3. Give the user of SQL / DataFrames the ability to specifically specify
> > what types they want
> >
> > If we wanted the ability for the user to specify `sum(i)` and let the
> type
> > coercion pass pick `sum_i32` or `sum_i64` depending on the input types, I
> > recommend doing that at a different level than the UDF (perhaps via
> > `register_alias("sum", "sum_i32)` or something), again for both clarity
> of
> > DataFusion implementation as well as UDF specification.
> >
> > Andrew
> >
> > On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão <
> > jorgecarleitao@gmail.com> wrote:
> >
> > > Thanks Andrew,
> > >
> > > I am not sure I articulated this well enough, though, as I did not
> > specify
> > > the type of polymorphism that I was thinking about. xD
> > >
> > > My question was/is about whether we should accept functions whose
> return
> > > type is known during planning, and constant during execution, or
> whether
> > > their return types must be constant both during planning and
> execution. I
> > > do not think we should support variable types during execution for the
> > > reasons that you enumerated. If by runtime polymorphism you mean
> changing
> > > types during execution, then I very much agree with you that that is a
> > > no-no.
> > >
> > > During planning, though, we have options: should we allow users to
> write
> > > something like `my_operation(f32|f64) -> (f32|f64)`, on which the type
> is
> > > inferred after we know the function's input in the logical plan, or
> > should
> > > we not allow that and require users to register `my_operation_f32(f32)`
> > and
> > > `my_operation_f64(f64)` separately? The three findings that I mentioned
> > > above refer to planned polymorphism: return type is resolved during
> > > planning (and constant during execution).
> > >
> > > The biggest use-case IMO for polymorphism during planning is for
> > functions
> > > that yield structures/lists of values (a-la collect_list) whose type
> can
> > > only be inferred after we know the functions' input type (array(f32) vs
> > > array(f64)), and whose implementation can be generalized via a macro +
> > > match.
> > >
> > > From a technical point of view, we currently have functions with
> variable
> > > types (all binary operators' return type depends on the lhs' type, sum,
> > > max/min, etc.), and we have to handle the main planning challenges
> > already.
> > > In this context, the questions are something like:
> > >
> > > 1. should we continue to have them or should we move away from them?
> > > 2.1 If not, what should we do with them? E.g. declare sum_i32, sum_i64,
> > > etc., that have a single return type?
> > > 2.2 if yes, show we allow users to register these types of functions,
> or
> > > should these only be allowed within DataFusion's code base?
> > >
> > > Best,
> > > Jorge
> > >
> > >
> > >
> > > On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com>
> > wrote:
> > >
> > > > In my opinion, I suggest we do not continue down the path of
> (runtime)
> > > > polymorphic functions unless a compelling use case for them can be
> > > > articulated.
> > > >
> > > > You have done a great job articulating some of the implementation
> > > > challenges, but I personally struggle to describe when, as a user of
> > > > DataFusion, I would want to write a (runtime) polymorphic function.
> > > >
> > > > A function with runtime polymorphism I think would mean the UDF could
> > > > handle the type changing *at runtime*: record batches could come in
> > with
> > > > multiple different types during the same execution. I can't think of
> > > > examples where this behavior would be desirable or necessary.
> > > >
> > > > The existing DataFusion codebase seems to assume (reasonably in my
> > > opinion)
> > > > that the schema of each Logical / Physical plan node is known at
> > planning
> > > > time and it does not change at runtime.
> > > >
> > > > Most query optimizers (and compilers for that matter) take advantage
> of
> > > > plan (compile) time type information to make runtime more efficient.
> > > Also,
> > > > it seems like other database / runtime systems such as mysql[1] and
> > > > postgres[2] require the UDF creator to explicitly specify the return
> > type
> > > > as well. I think we should consider the simpler semantics of "1
> return
> > > type
> > > > for each UDF" to make it easier on people writing UDFs as well as
> > > > simplifying the implementation of DataFusion itself.
> > > >
> > > > Andrew
> > > >
> > > > [1] https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> > > > [2] https://www.postgresql.org/docs/12/sql-createfunction.html
> > > >
> > > > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> > > > jorgecarleitao@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Recently, I have been contributing to DataFusion, and I would like
> to
> > > > bring
> > > > > to your attention a question that I faced while PRing to DataFusion
> > > that
> > > > > IMO needs some alignment :)
> > > > >
> > > > > DataFusion supports scalar UDFs: functions that expect a type,
> > return a
> > > > > type, and performs some operation on the data (a-la spark UDF).
> > > However,
> > > > > the execution engine is actually dynamically typed:
> > > > >
> > > > > * a scalar UDF receives an &[ArrayRef] that must be downcasted
> > > > accordingly
> > > > > * a scalar UDF must select the builder that matches its signature,
> so
> > > > that
> > > > > its return type matches the ArrayRef that it returns.
> > > > >
> > > > > This suggests that we can treat functions as polymorphic: as long
> as
> > > the
> > > > > function handles the different types (e.g. via match), we are good.
> > We
> > > > > currently do not support multiple input types nor variable return
> > types
> > > > in
> > > > > their function signatures.
> > > > >
> > > > > Our current (non-udf) scalar and aggregate functions are already
> > > > > polymorphic on both their input and return type: sum(i32) -> i64,
> > > > sum(f64)
> > > > > -> f64, "a + b". I have been working on PRs to support polymorphic
> > > > support
> > > > > to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as
> > > well
> > > > as
> > > > > polymorphic aggregate UDFs [2], so that we can extend our offering
> to
> > > > more
> > > > > interesting functions such as "length(t) -> uint", "array(c1, c2)",
> > > > > "collect_list(t) -> array(t)", etc.
> > > > >
> > > > > However, while working on [1,2,3], I reach some non-trivial
> findings
> > > > that I
> > > > > would like to share:
> > > > >
> > > > > Finding 1: to support polymorphic functions, our logical and
> physical
> > > > > expressions (Expr and PhysicalExpr) need to be polymorphic as-well:
> > > once
> > > > a
> > > > > function is polymorphic, any expression containing it is also
> > > > polymorphic.
> > > > >
> > > > > Finding 2: when a polymorphic expression passes through our type
> > > coercer
> > > > > optimizer (that tries to coerce types to match a function's
> > signature),
> > > > it
> > > > > may be re-casted to a different type. If the return type changes,
> the
> > > > > optimizer may need to re-cast operations dependent of the function
> > call
> > > > > (e.g. a projection followed by an aggregation may need a recast on
> > the
> > > > > projection and on the aggregation).
> > > > >
> > > > > Finding 3: when an expression passes through our type coercer
> > optimizer
> > > > and
> > > > > is re-casted, its name changes (typically from "expr" to "CAST(expr
> > as
> > > > > X)"). This implies that a column referenced as #expr down the plan
> > may
> > > > not
> > > > > exist depending on the input type of the initial projection/scan.
> > > > >
> > > > > Finding 1 and 2 IMO are a direct consequence of polymorphism and
> the
> > > only
> > > > > way to not handle them is by not supporting polymorphism (e.g. the
> > user
> > > > > registers sqrt_f32 and sqrt_f64, etc).
> > > > >
> > > > > Finding 3 can be addressed in at least three ways:
> > > > >
> > > > > A) make the optimizer rewrite the expression as "CAST(expr as X) AS
> > > > expr",
> > > > > so that it retains its original name. This hides the actual
> > > expression's
> > > > > calculation, but preserves its original name.
> > > > > B) accept that expressions can always change its name, which means
> > that
> > > > the
> > > > > user should be mindful when writing `col("SELECT sqrt(x) FROM t"`,
> as
> > > the
> > > > > column name may end up being called `"sqrt(CAST(x as X))"`.
> > > > > C) Do not support polymorphic functions
> > > > >
> > > > > Note that we currently already experience effects 1-3, it is just
> > that
> > > we
> > > > > use so few polymorphic functions that these seldomly present
> > > themselves.
> > > > It
> > > > > was while working on [1,2,3] that I start painting the bigger
> > picture.
> > > > >
> > > > > Some questions:
> > > > > 1. should continue down the path of polymorphic functions?
> > > > > 2. if yes, how do handle finding 3?
> > > > >
> > > > > Looking at the current code base, I am confident that we can
> address
> > > the
> > > > > technical issues to support polymorphic functions. However, it
> would
> > be
> > > > > interesting to have your thoughts on this.
> > > > >
> > > > > [1] https://github.com/apache/arrow/pull/7967
> > > > > [2] https://github.com/apache/arrow/pull/7971
> > > > > [3] https://github.com/apache/arrow/pull/7974
> > > > >
> > > >
> > >
> >
>

Re: Polymorphism in DataFusion

Posted by Jorge Cardoso Leitão <jo...@gmail.com>.
Hi,

Thank you for this enlightening discussion, Andrew!

So, just to make sure I understood, are you proposing A), B) or something
else?

A) we should not accept / declare polymorphic operations: all types should
be known based on the operation name (e.g. sum_f32, plus_f32, etc.)
B) we should continue to have "sum", "count", "+", etc. as polymorphic
operations, but we should not allow registering udfs as polymorphic, both
internally nor externally. I.e. all polymorphic operations are hard-coded.

Let's assume A) first. I relate to the sentiment that Rust is statically
typed. However, as I see it, DataFusion is not: our main traits are
arrow::array::Array and RecordBatch, which are both dynamically typed (e.g.
Array::{data_type,as_any} and RecordBatch). Since all ops are also
dynamically typed (they receive Arc<Array> or RecordBatch) and use runtime
reflection via `match array.data_type()` at the physical level to downcast
Array to its respective native type, wouldn't A) lead to a major change in
DataFusion?

Let's now assume B), and let me try to expand on your 3 points:

1. Once an operation in our plan is polymorphic, the whole plan is
polymorphic and the final schema can only be inferred from the initial
projection's schema / scan. A simple example of this using only functions
that we currently support is:

df = scan([c1 float32, c2 float64, c3 float64])
df = df.select(c1 * c2 as sum12, c1 * c3 as sum13)
df = df.aggregate(MIN(sum12), MIN(sum13))

The plan for this is that the first product returns a float32 (lower
precision of both), and the second returns a float64. MIN's return type now
depends on the first select's return type, which is in a previous node. So,
even if only our internal ops are polymorphic, this is sufficient to
require our optimizers to handle dynamically typed expressions and schemas
whose type is only known during planning (after the scan's schema is known).

2. I relate to that sentiment. About the same time Andy proposed the (now
UDFs) dynamically typed UDFs, I made a 1k+ proposal for statically typed
UDFs. In retrospect, IMO dynamically typed UDFs are a far superior offering
as they offer an enormous flexibility and at no additional cost: we could
offer users an interface with fixed types only (e.g. via a macro), but, in
the end, all our memory structures are dynamic typed anyway (Array,
RecordBatch, etc.), and thus whether the user or us, a downcast will still
need to take place at runtime.

3. Users are still able to specify the type they want in query languages
that support polymorphic functions such as postgres, both at the query
level and on polymorphic UDFs. Most dialects support cast operations that
allow users to narrow types (::float in postgres, CAST(x AS float64) in
spark), that are only physically executed if needed.

So, to summarize my thoughts so far:

i) DataFusion is dynamically typed by design
ii) We already support dynamically typed scalar UDFs
iii) we currently have polymorphic functions (internally) and already have
to deal with them on our logical and physical plans.
iv) there is no practical limitation of supporting polymorphic UDFs, it is
a matter of whether the benefits outweigh the development and maintenance
costs.

I am inclined to say that given i-iii), we should support polymorphic
(scalar and agg) UDFs, which would put us on the same level of UDF support
as postgres. However, we should offer a very easy interface for users to
register a non-polymorphic UDF, e.g.

ctx.register(name, udf(callable, arg_types, return_type)?)?

where udf returns the specialization of a generic UDF that expects N types
and returns return_type.

Best,
Jorge


On Tue, Aug 18, 2020 at 6:52 PM Andrew Lamb <al...@influxdata.com> wrote:

> It is my personal opinion that actual UDF functions  registered with data
> fusion should take a known set of input types and single return type (e.g.
> sum_i32 --> i32). I think this would:
> 1. Simplify the implementation of both the DataFusion optimizer and the
> UDFs
> 2. Make it easier for UDF writers as the UDF code would look more like
> Rust: the types would be clear from the function signatures, as is the case
> in Rust in general
> 3. Give the user of SQL / DataFrames the ability to specifically specify
> what types they want
>
> If we wanted the ability for the user to specify `sum(i)` and let the type
> coercion pass pick `sum_i32` or `sum_i64` depending on the input types, I
> recommend doing that at a different level than the UDF (perhaps via
> `register_alias("sum", "sum_i32)` or something), again for both clarity of
> DataFusion implementation as well as UDF specification.
>
> Andrew
>
> On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão <
> jorgecarleitao@gmail.com> wrote:
>
> > Thanks Andrew,
> >
> > I am not sure I articulated this well enough, though, as I did not
> specify
> > the type of polymorphism that I was thinking about. xD
> >
> > My question was/is about whether we should accept functions whose return
> > type is known during planning, and constant during execution, or whether
> > their return types must be constant both during planning and execution. I
> > do not think we should support variable types during execution for the
> > reasons that you enumerated. If by runtime polymorphism you mean changing
> > types during execution, then I very much agree with you that that is a
> > no-no.
> >
> > During planning, though, we have options: should we allow users to write
> > something like `my_operation(f32|f64) -> (f32|f64)`, on which the type is
> > inferred after we know the function's input in the logical plan, or
> should
> > we not allow that and require users to register `my_operation_f32(f32)`
> and
> > `my_operation_f64(f64)` separately? The three findings that I mentioned
> > above refer to planned polymorphism: return type is resolved during
> > planning (and constant during execution).
> >
> > The biggest use-case IMO for polymorphism during planning is for
> functions
> > that yield structures/lists of values (a-la collect_list) whose type can
> > only be inferred after we know the functions' input type (array(f32) vs
> > array(f64)), and whose implementation can be generalized via a macro +
> > match.
> >
> > From a technical point of view, we currently have functions with variable
> > types (all binary operators' return type depends on the lhs' type, sum,
> > max/min, etc.), and we have to handle the main planning challenges
> already.
> > In this context, the questions are something like:
> >
> > 1. should we continue to have them or should we move away from them?
> > 2.1 If not, what should we do with them? E.g. declare sum_i32, sum_i64,
> > etc., that have a single return type?
> > 2.2 if yes, show we allow users to register these types of functions, or
> > should these only be allowed within DataFusion's code base?
> >
> > Best,
> > Jorge
> >
> >
> >
> > On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com>
> wrote:
> >
> > > In my opinion, I suggest we do not continue down the path of (runtime)
> > > polymorphic functions unless a compelling use case for them can be
> > > articulated.
> > >
> > > You have done a great job articulating some of the implementation
> > > challenges, but I personally struggle to describe when, as a user of
> > > DataFusion, I would want to write a (runtime) polymorphic function.
> > >
> > > A function with runtime polymorphism I think would mean the UDF could
> > > handle the type changing *at runtime*: record batches could come in
> with
> > > multiple different types during the same execution. I can't think of
> > > examples where this behavior would be desirable or necessary.
> > >
> > > The existing DataFusion codebase seems to assume (reasonably in my
> > opinion)
> > > that the schema of each Logical / Physical plan node is known at
> planning
> > > time and it does not change at runtime.
> > >
> > > Most query optimizers (and compilers for that matter) take advantage of
> > > plan (compile) time type information to make runtime more efficient.
> > Also,
> > > it seems like other database / runtime systems such as mysql[1] and
> > > postgres[2] require the UDF creator to explicitly specify the return
> type
> > > as well. I think we should consider the simpler semantics of "1 return
> > type
> > > for each UDF" to make it easier on people writing UDFs as well as
> > > simplifying the implementation of DataFusion itself.
> > >
> > > Andrew
> > >
> > > [1] https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> > > [2] https://www.postgresql.org/docs/12/sql-createfunction.html
> > >
> > > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> > > jorgecarleitao@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > Recently, I have been contributing to DataFusion, and I would like to
> > > bring
> > > > to your attention a question that I faced while PRing to DataFusion
> > that
> > > > IMO needs some alignment :)
> > > >
> > > > DataFusion supports scalar UDFs: functions that expect a type,
> return a
> > > > type, and performs some operation on the data (a-la spark UDF).
> > However,
> > > > the execution engine is actually dynamically typed:
> > > >
> > > > * a scalar UDF receives an &[ArrayRef] that must be downcasted
> > > accordingly
> > > > * a scalar UDF must select the builder that matches its signature, so
> > > that
> > > > its return type matches the ArrayRef that it returns.
> > > >
> > > > This suggests that we can treat functions as polymorphic: as long as
> > the
> > > > function handles the different types (e.g. via match), we are good.
> We
> > > > currently do not support multiple input types nor variable return
> types
> > > in
> > > > their function signatures.
> > > >
> > > > Our current (non-udf) scalar and aggregate functions are already
> > > > polymorphic on both their input and return type: sum(i32) -> i64,
> > > sum(f64)
> > > > -> f64, "a + b". I have been working on PRs to support polymorphic
> > > support
> > > > to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as
> > well
> > > as
> > > > polymorphic aggregate UDFs [2], so that we can extend our offering to
> > > more
> > > > interesting functions such as "length(t) -> uint", "array(c1, c2)",
> > > > "collect_list(t) -> array(t)", etc.
> > > >
> > > > However, while working on [1,2,3], I reach some non-trivial findings
> > > that I
> > > > would like to share:
> > > >
> > > > Finding 1: to support polymorphic functions, our logical and physical
> > > > expressions (Expr and PhysicalExpr) need to be polymorphic as-well:
> > once
> > > a
> > > > function is polymorphic, any expression containing it is also
> > > polymorphic.
> > > >
> > > > Finding 2: when a polymorphic expression passes through our type
> > coercer
> > > > optimizer (that tries to coerce types to match a function's
> signature),
> > > it
> > > > may be re-casted to a different type. If the return type changes, the
> > > > optimizer may need to re-cast operations dependent of the function
> call
> > > > (e.g. a projection followed by an aggregation may need a recast on
> the
> > > > projection and on the aggregation).
> > > >
> > > > Finding 3: when an expression passes through our type coercer
> optimizer
> > > and
> > > > is re-casted, its name changes (typically from "expr" to "CAST(expr
> as
> > > > X)"). This implies that a column referenced as #expr down the plan
> may
> > > not
> > > > exist depending on the input type of the initial projection/scan.
> > > >
> > > > Finding 1 and 2 IMO are a direct consequence of polymorphism and the
> > only
> > > > way to not handle them is by not supporting polymorphism (e.g. the
> user
> > > > registers sqrt_f32 and sqrt_f64, etc).
> > > >
> > > > Finding 3 can be addressed in at least three ways:
> > > >
> > > > A) make the optimizer rewrite the expression as "CAST(expr as X) AS
> > > expr",
> > > > so that it retains its original name. This hides the actual
> > expression's
> > > > calculation, but preserves its original name.
> > > > B) accept that expressions can always change its name, which means
> that
> > > the
> > > > user should be mindful when writing `col("SELECT sqrt(x) FROM t"`, as
> > the
> > > > column name may end up being called `"sqrt(CAST(x as X))"`.
> > > > C) Do not support polymorphic functions
> > > >
> > > > Note that we currently already experience effects 1-3, it is just
> that
> > we
> > > > use so few polymorphic functions that these seldomly present
> > themselves.
> > > It
> > > > was while working on [1,2,3] that I start painting the bigger
> picture.
> > > >
> > > > Some questions:
> > > > 1. should continue down the path of polymorphic functions?
> > > > 2. if yes, how do handle finding 3?
> > > >
> > > > Looking at the current code base, I am confident that we can address
> > the
> > > > technical issues to support polymorphic functions. However, it would
> be
> > > > interesting to have your thoughts on this.
> > > >
> > > > [1] https://github.com/apache/arrow/pull/7967
> > > > [2] https://github.com/apache/arrow/pull/7971
> > > > [3] https://github.com/apache/arrow/pull/7974
> > > >
> > >
> >
>

Re: Polymorphism in DataFusion

Posted by Andrew Lamb <al...@influxdata.com>.
It is my personal opinion that actual UDF functions  registered with data
fusion should take a known set of input types and single return type (e.g.
sum_i32 --> i32). I think this would:
1. Simplify the implementation of both the DataFusion optimizer and the UDFs
2. Make it easier for UDF writers as the UDF code would look more like
Rust: the types would be clear from the function signatures, as is the case
in Rust in general
3. Give the user of SQL / DataFrames the ability to specifically specify
what types they want

If we wanted the ability for the user to specify `sum(i)` and let the type
coercion pass pick `sum_i32` or `sum_i64` depending on the input types, I
recommend doing that at a different level than the UDF (perhaps via
`register_alias("sum", "sum_i32)` or something), again for both clarity of
DataFusion implementation as well as UDF specification.

Andrew

On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão <
jorgecarleitao@gmail.com> wrote:

> Thanks Andrew,
>
> I am not sure I articulated this well enough, though, as I did not specify
> the type of polymorphism that I was thinking about. xD
>
> My question was/is about whether we should accept functions whose return
> type is known during planning, and constant during execution, or whether
> their return types must be constant both during planning and execution. I
> do not think we should support variable types during execution for the
> reasons that you enumerated. If by runtime polymorphism you mean changing
> types during execution, then I very much agree with you that that is a
> no-no.
>
> During planning, though, we have options: should we allow users to write
> something like `my_operation(f32|f64) -> (f32|f64)`, on which the type is
> inferred after we know the function's input in the logical plan, or should
> we not allow that and require users to register `my_operation_f32(f32)` and
> `my_operation_f64(f64)` separately? The three findings that I mentioned
> above refer to planned polymorphism: return type is resolved during
> planning (and constant during execution).
>
> The biggest use-case IMO for polymorphism during planning is for functions
> that yield structures/lists of values (a-la collect_list) whose type can
> only be inferred after we know the functions' input type (array(f32) vs
> array(f64)), and whose implementation can be generalized via a macro +
> match.
>
> From a technical point of view, we currently have functions with variable
> types (all binary operators' return type depends on the lhs' type, sum,
> max/min, etc.), and we have to handle the main planning challenges already.
> In this context, the questions are something like:
>
> 1. should we continue to have them or should we move away from them?
> 2.1 If not, what should we do with them? E.g. declare sum_i32, sum_i64,
> etc., that have a single return type?
> 2.2 if yes, show we allow users to register these types of functions, or
> should these only be allowed within DataFusion's code base?
>
> Best,
> Jorge
>
>
>
> On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com> wrote:
>
> > In my opinion, I suggest we do not continue down the path of (runtime)
> > polymorphic functions unless a compelling use case for them can be
> > articulated.
> >
> > You have done a great job articulating some of the implementation
> > challenges, but I personally struggle to describe when, as a user of
> > DataFusion, I would want to write a (runtime) polymorphic function.
> >
> > A function with runtime polymorphism I think would mean the UDF could
> > handle the type changing *at runtime*: record batches could come in with
> > multiple different types during the same execution. I can't think of
> > examples where this behavior would be desirable or necessary.
> >
> > The existing DataFusion codebase seems to assume (reasonably in my
> opinion)
> > that the schema of each Logical / Physical plan node is known at planning
> > time and it does not change at runtime.
> >
> > Most query optimizers (and compilers for that matter) take advantage of
> > plan (compile) time type information to make runtime more efficient.
> Also,
> > it seems like other database / runtime systems such as mysql[1] and
> > postgres[2] require the UDF creator to explicitly specify the return type
> > as well. I think we should consider the simpler semantics of "1 return
> type
> > for each UDF" to make it easier on people writing UDFs as well as
> > simplifying the implementation of DataFusion itself.
> >
> > Andrew
> >
> > [1] https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> > [2] https://www.postgresql.org/docs/12/sql-createfunction.html
> >
> > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> > jorgecarleitao@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Recently, I have been contributing to DataFusion, and I would like to
> > bring
> > > to your attention a question that I faced while PRing to DataFusion
> that
> > > IMO needs some alignment :)
> > >
> > > DataFusion supports scalar UDFs: functions that expect a type, return a
> > > type, and performs some operation on the data (a-la spark UDF).
> However,
> > > the execution engine is actually dynamically typed:
> > >
> > > * a scalar UDF receives an &[ArrayRef] that must be downcasted
> > accordingly
> > > * a scalar UDF must select the builder that matches its signature, so
> > that
> > > its return type matches the ArrayRef that it returns.
> > >
> > > This suggests that we can treat functions as polymorphic: as long as
> the
> > > function handles the different types (e.g. via match), we are good. We
> > > currently do not support multiple input types nor variable return types
> > in
> > > their function signatures.
> > >
> > > Our current (non-udf) scalar and aggregate functions are already
> > > polymorphic on both their input and return type: sum(i32) -> i64,
> > sum(f64)
> > > -> f64, "a + b". I have been working on PRs to support polymorphic
> > support
> > > to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as
> well
> > as
> > > polymorphic aggregate UDFs [2], so that we can extend our offering to
> > more
> > > interesting functions such as "length(t) -> uint", "array(c1, c2)",
> > > "collect_list(t) -> array(t)", etc.
> > >
> > > However, while working on [1,2,3], I reach some non-trivial findings
> > that I
> > > would like to share:
> > >
> > > Finding 1: to support polymorphic functions, our logical and physical
> > > expressions (Expr and PhysicalExpr) need to be polymorphic as-well:
> once
> > a
> > > function is polymorphic, any expression containing it is also
> > polymorphic.
> > >
> > > Finding 2: when a polymorphic expression passes through our type
> coercer
> > > optimizer (that tries to coerce types to match a function's signature),
> > it
> > > may be re-casted to a different type. If the return type changes, the
> > > optimizer may need to re-cast operations dependent of the function call
> > > (e.g. a projection followed by an aggregation may need a recast on the
> > > projection and on the aggregation).
> > >
> > > Finding 3: when an expression passes through our type coercer optimizer
> > and
> > > is re-casted, its name changes (typically from "expr" to "CAST(expr as
> > > X)"). This implies that a column referenced as #expr down the plan may
> > not
> > > exist depending on the input type of the initial projection/scan.
> > >
> > > Finding 1 and 2 IMO are a direct consequence of polymorphism and the
> only
> > > way to not handle them is by not supporting polymorphism (e.g. the user
> > > registers sqrt_f32 and sqrt_f64, etc).
> > >
> > > Finding 3 can be addressed in at least three ways:
> > >
> > > A) make the optimizer rewrite the expression as "CAST(expr as X) AS
> > expr",
> > > so that it retains its original name. This hides the actual
> expression's
> > > calculation, but preserves its original name.
> > > B) accept that expressions can always change its name, which means that
> > the
> > > user should be mindful when writing `col("SELECT sqrt(x) FROM t"`, as
> the
> > > column name may end up being called `"sqrt(CAST(x as X))"`.
> > > C) Do not support polymorphic functions
> > >
> > > Note that we currently already experience effects 1-3, it is just that
> we
> > > use so few polymorphic functions that these seldomly present
> themselves.
> > It
> > > was while working on [1,2,3] that I start painting the bigger picture.
> > >
> > > Some questions:
> > > 1. should continue down the path of polymorphic functions?
> > > 2. if yes, how do handle finding 3?
> > >
> > > Looking at the current code base, I am confident that we can address
> the
> > > technical issues to support polymorphic functions. However, it would be
> > > interesting to have your thoughts on this.
> > >
> > > [1] https://github.com/apache/arrow/pull/7967
> > > [2] https://github.com/apache/arrow/pull/7971
> > > [3] https://github.com/apache/arrow/pull/7974
> > >
> >
>

Re: Polymorphism in DataFusion

Posted by Jorge Cardoso Leitão <jo...@gmail.com>.
Thanks Andrew,

I am not sure I articulated this well enough, though, as I did not specify
the type of polymorphism that I was thinking about. xD

My question was/is about whether we should accept functions whose return
type is known during planning, and constant during execution, or whether
their return types must be constant both during planning and execution. I
do not think we should support variable types during execution for the
reasons that you enumerated. If by runtime polymorphism you mean changing
types during execution, then I very much agree with you that that is a
no-no.

During planning, though, we have options: should we allow users to write
something like `my_operation(f32|f64) -> (f32|f64)`, on which the type is
inferred after we know the function's input in the logical plan, or should
we not allow that and require users to register `my_operation_f32(f32)` and
`my_operation_f64(f64)` separately? The three findings that I mentioned
above refer to planned polymorphism: return type is resolved during
planning (and constant during execution).

The biggest use-case IMO for polymorphism during planning is for functions
that yield structures/lists of values (a-la collect_list) whose type can
only be inferred after we know the functions' input type (array(f32) vs
array(f64)), and whose implementation can be generalized via a macro +
match.

From a technical point of view, we currently have functions with variable
types (all binary operators' return type depends on the lhs' type, sum,
max/min, etc.), and we have to handle the main planning challenges already.
In this context, the questions are something like:

1. should we continue to have them or should we move away from them?
2.1 If not, what should we do with them? E.g. declare sum_i32, sum_i64,
etc., that have a single return type?
2.2 if yes, show we allow users to register these types of functions, or
should these only be allowed within DataFusion's code base?

Best,
Jorge



On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com> wrote:

> In my opinion, I suggest we do not continue down the path of (runtime)
> polymorphic functions unless a compelling use case for them can be
> articulated.
>
> You have done a great job articulating some of the implementation
> challenges, but I personally struggle to describe when, as a user of
> DataFusion, I would want to write a (runtime) polymorphic function.
>
> A function with runtime polymorphism I think would mean the UDF could
> handle the type changing *at runtime*: record batches could come in with
> multiple different types during the same execution. I can't think of
> examples where this behavior would be desirable or necessary.
>
> The existing DataFusion codebase seems to assume (reasonably in my opinion)
> that the schema of each Logical / Physical plan node is known at planning
> time and it does not change at runtime.
>
> Most query optimizers (and compilers for that matter) take advantage of
> plan (compile) time type information to make runtime more efficient.  Also,
> it seems like other database / runtime systems such as mysql[1] and
> postgres[2] require the UDF creator to explicitly specify the return type
> as well. I think we should consider the simpler semantics of "1 return type
> for each UDF" to make it easier on people writing UDFs as well as
> simplifying the implementation of DataFusion itself.
>
> Andrew
>
> [1] https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
> [2] https://www.postgresql.org/docs/12/sql-createfunction.html
>
> On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
> jorgecarleitao@gmail.com> wrote:
>
> > Hi,
> >
> > Recently, I have been contributing to DataFusion, and I would like to
> bring
> > to your attention a question that I faced while PRing to DataFusion that
> > IMO needs some alignment :)
> >
> > DataFusion supports scalar UDFs: functions that expect a type, return a
> > type, and performs some operation on the data (a-la spark UDF). However,
> > the execution engine is actually dynamically typed:
> >
> > * a scalar UDF receives an &[ArrayRef] that must be downcasted
> accordingly
> > * a scalar UDF must select the builder that matches its signature, so
> that
> > its return type matches the ArrayRef that it returns.
> >
> > This suggests that we can treat functions as polymorphic: as long as the
> > function handles the different types (e.g. via match), we are good. We
> > currently do not support multiple input types nor variable return types
> in
> > their function signatures.
> >
> > Our current (non-udf) scalar and aggregate functions are already
> > polymorphic on both their input and return type: sum(i32) -> i64,
> sum(f64)
> > -> f64, "a + b". I have been working on PRs to support polymorphic
> support
> > to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as well
> as
> > polymorphic aggregate UDFs [2], so that we can extend our offering to
> more
> > interesting functions such as "length(t) -> uint", "array(c1, c2)",
> > "collect_list(t) -> array(t)", etc.
> >
> > However, while working on [1,2,3], I reach some non-trivial findings
> that I
> > would like to share:
> >
> > Finding 1: to support polymorphic functions, our logical and physical
> > expressions (Expr and PhysicalExpr) need to be polymorphic as-well: once
> a
> > function is polymorphic, any expression containing it is also
> polymorphic.
> >
> > Finding 2: when a polymorphic expression passes through our type coercer
> > optimizer (that tries to coerce types to match a function's signature),
> it
> > may be re-casted to a different type. If the return type changes, the
> > optimizer may need to re-cast operations dependent of the function call
> > (e.g. a projection followed by an aggregation may need a recast on the
> > projection and on the aggregation).
> >
> > Finding 3: when an expression passes through our type coercer optimizer
> and
> > is re-casted, its name changes (typically from "expr" to "CAST(expr as
> > X)"). This implies that a column referenced as #expr down the plan may
> not
> > exist depending on the input type of the initial projection/scan.
> >
> > Finding 1 and 2 IMO are a direct consequence of polymorphism and the only
> > way to not handle them is by not supporting polymorphism (e.g. the user
> > registers sqrt_f32 and sqrt_f64, etc).
> >
> > Finding 3 can be addressed in at least three ways:
> >
> > A) make the optimizer rewrite the expression as "CAST(expr as X) AS
> expr",
> > so that it retains its original name. This hides the actual expression's
> > calculation, but preserves its original name.
> > B) accept that expressions can always change its name, which means that
> the
> > user should be mindful when writing `col("SELECT sqrt(x) FROM t"`, as the
> > column name may end up being called `"sqrt(CAST(x as X))"`.
> > C) Do not support polymorphic functions
> >
> > Note that we currently already experience effects 1-3, it is just that we
> > use so few polymorphic functions that these seldomly present themselves.
> It
> > was while working on [1,2,3] that I start painting the bigger picture.
> >
> > Some questions:
> > 1. should continue down the path of polymorphic functions?
> > 2. if yes, how do handle finding 3?
> >
> > Looking at the current code base, I am confident that we can address the
> > technical issues to support polymorphic functions. However, it would be
> > interesting to have your thoughts on this.
> >
> > [1] https://github.com/apache/arrow/pull/7967
> > [2] https://github.com/apache/arrow/pull/7971
> > [3] https://github.com/apache/arrow/pull/7974
> >
>

Re: Polymorphism in DataFusion

Posted by Andrew Lamb <al...@influxdata.com>.
In my opinion, I suggest we do not continue down the path of (runtime)
polymorphic functions unless a compelling use case for them can be
articulated.

You have done a great job articulating some of the implementation
challenges, but I personally struggle to describe when, as a user of
DataFusion, I would want to write a (runtime) polymorphic function.

A function with runtime polymorphism I think would mean the UDF could
handle the type changing *at runtime*: record batches could come in with
multiple different types during the same execution. I can't think of
examples where this behavior would be desirable or necessary.

The existing DataFusion codebase seems to assume (reasonably in my opinion)
that the schema of each Logical / Physical plan node is known at planning
time and it does not change at runtime.

Most query optimizers (and compilers for that matter) take advantage of
plan (compile) time type information to make runtime more efficient.  Also,
it seems like other database / runtime systems such as mysql[1] and
postgres[2] require the UDF creator to explicitly specify the return type
as well. I think we should consider the simpler semantics of "1 return type
for each UDF" to make it easier on people writing UDFs as well as
simplifying the implementation of DataFusion itself.

Andrew

[1] https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html
[2] https://www.postgresql.org/docs/12/sql-createfunction.html

On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão <
jorgecarleitao@gmail.com> wrote:

> Hi,
>
> Recently, I have been contributing to DataFusion, and I would like to bring
> to your attention a question that I faced while PRing to DataFusion that
> IMO needs some alignment :)
>
> DataFusion supports scalar UDFs: functions that expect a type, return a
> type, and performs some operation on the data (a-la spark UDF). However,
> the execution engine is actually dynamically typed:
>
> * a scalar UDF receives an &[ArrayRef] that must be downcasted accordingly
> * a scalar UDF must select the builder that matches its signature, so that
> its return type matches the ArrayRef that it returns.
>
> This suggests that we can treat functions as polymorphic: as long as the
> function handles the different types (e.g. via match), we are good. We
> currently do not support multiple input types nor variable return types in
> their function signatures.
>
> Our current (non-udf) scalar and aggregate functions are already
> polymorphic on both their input and return type: sum(i32) -> i64, sum(f64)
> -> f64, "a + b". I have been working on PRs to support polymorphic support
> to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as well as
> polymorphic aggregate UDFs [2], so that we can extend our offering to more
> interesting functions such as "length(t) -> uint", "array(c1, c2)",
> "collect_list(t) -> array(t)", etc.
>
> However, while working on [1,2,3], I reach some non-trivial findings that I
> would like to share:
>
> Finding 1: to support polymorphic functions, our logical and physical
> expressions (Expr and PhysicalExpr) need to be polymorphic as-well: once a
> function is polymorphic, any expression containing it is also polymorphic.
>
> Finding 2: when a polymorphic expression passes through our type coercer
> optimizer (that tries to coerce types to match a function's signature), it
> may be re-casted to a different type. If the return type changes, the
> optimizer may need to re-cast operations dependent of the function call
> (e.g. a projection followed by an aggregation may need a recast on the
> projection and on the aggregation).
>
> Finding 3: when an expression passes through our type coercer optimizer and
> is re-casted, its name changes (typically from "expr" to "CAST(expr as
> X)"). This implies that a column referenced as #expr down the plan may not
> exist depending on the input type of the initial projection/scan.
>
> Finding 1 and 2 IMO are a direct consequence of polymorphism and the only
> way to not handle them is by not supporting polymorphism (e.g. the user
> registers sqrt_f32 and sqrt_f64, etc).
>
> Finding 3 can be addressed in at least three ways:
>
> A) make the optimizer rewrite the expression as "CAST(expr as X) AS expr",
> so that it retains its original name. This hides the actual expression's
> calculation, but preserves its original name.
> B) accept that expressions can always change its name, which means that the
> user should be mindful when writing `col("SELECT sqrt(x) FROM t"`, as the
> column name may end up being called `"sqrt(CAST(x as X))"`.
> C) Do not support polymorphic functions
>
> Note that we currently already experience effects 1-3, it is just that we
> use so few polymorphic functions that these seldomly present themselves. It
> was while working on [1,2,3] that I start painting the bigger picture.
>
> Some questions:
> 1. should continue down the path of polymorphic functions?
> 2. if yes, how do handle finding 3?
>
> Looking at the current code base, I am confident that we can address the
> technical issues to support polymorphic functions. However, it would be
> interesting to have your thoughts on this.
>
> [1] https://github.com/apache/arrow/pull/7967
> [2] https://github.com/apache/arrow/pull/7971
> [3] https://github.com/apache/arrow/pull/7974
>