You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Yaron Gvili <rt...@hotmail.com> on 2022/05/15 12:02:28 UTC

design for Python UDFs in an Ibis/Substrait/Arrow workflow

Hi,

I'm working on a Python UDFs PoC and would like to get the community's feedback on its design.

The goal of this PoC is to enable a user to integrate Python UDFs in an Ibis/Substrait/Arrow workflow. The basic idea is that the user would create an Ibis expression that includes Python UDFs implemented by the user, then use a single invocation to:

  1.  produce a Substrait plan for the Ibis expression;
  2.  deliver the Substrait plan to Arrow;
  3.  consume the Substrait plan in Arrow to obtain an execution plan;
  4.  run the execution plan;
  5.  deliver back the resulting table outputs of the plan.

I already have the above steps implemented, with support for some existing execution nodes and compute functions in Arrow, but without the support for Python UDFs, which is the focus of this discussion. To add this support I am thinking about a design where:

  *   Ibis is extended to support an expression of type Python UDF, which carries (text-form) serialized UDF code (likely using cloudpickle)
  *   Ibis-Substrait is extended to support an extension function of type Python UDF, which also carries (text-within-protobuf-form) serialized UDF code
  *   PyArrow is extended to process a Substrait plan that includes serialized Python UDFs using these steps:
     *   Deserialize the Python UDFs
     *   Register the Python UDFs (preferably, in a local scope that is cleared after processing ends)
     *   Run the execution plan corresponding to the Substrait plan
     *   Return the resulting table outputs of the plan

Note that this design requires the user to drive PyArrow. It does not support driving Arrow C++ to process such a Substrait plan (it does not start an embedded Python interpreter).


Cheers,
Yaron.

Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow

Posted by Yaron Gvili <rt...@hotmail.com>.
Thanks, Weston.

I was able to locally build a PoC of this to work for UDFs with some limitations - see details below. I'd like to get feedback about whether this approach is acceptable. If so, I could start contributing its components.

With respect to Li's design points:

  1.  A way to serialize the user function (and function parameters): This works using cloudpickle.dumps(f) where f is the Python UDF function.
  2.  Add a Substrait relation/expression that captures the UDF: This works using a new "UserDefinedFunction" protobuf message under the "ExtensionFunction" message. This also provides fail-safety for legacy Substrait-consumers - if the Substrait-consumer is unaware of the "UserDefinedFunction" message then it would error due to not finding the described function by name, which the Substrait-producer is required to make different from all the names in the default extension-id-registry.
  3.  Deserialize the Substrait relation/expression in Arrow compute and execute the UDF: This works by extending the current Scalar UDF prototype with automatic registration given the UDF function declarations embedded in the Substrait plan. The function extension registration is done in a per-plan scope, without modifying the global/default extension-if-registry and while avoiding cross-plan naming-conflicts.

With respect to my design points:

  1.  There was no need to support a serialized UDF at the Ibis level; the existing vectorized UDF support was sufficient for capturing UDFs.
  2.  As planned, Ibis-Substrait is extended to support an extension function of type Python UDF.
  3.  Almost as planned, PyArrow is extended to process a Substrait plan that includes serialized Python UDFs, yet the result is a record-batch-reader, rather than a table as planned.

Currently, the PoC only supports flat scalar-valued element-wise-type Python UDFs. I expect that adding later on support for table/dataset/struct-valued and analytic/reduction-type Python UDFs should not be too hard.


Yaron.
________________________________
From: Li Jin <ic...@gmail.com>
Sent: Tuesday, May 17, 2022 4:46 PM
To: dev@arrow.apache.org <de...@arrow.apache.org>
Subject: Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow

Thanks Weston.

> Good questions.  I haven't given this a whole lot of thought yet so
> take these suggestions with a grain of salt.  Probably the minimal
> code change approach would be to implement [2] and call the unregister
> from the ExecPlan's destructor.  However, this is going to eventually
> lead to naming conflicts if multiple UDFs are running in parallel.  We
> could generate unique names to work around this.  We could also
> consider a second function registry, unique to the plan, combined with
> the master function registry via some kind of facade registry that
> prefer's local UDFs and falls back to process UDFs.  I think that
> might be the best long term solution.

If I understand this correctly, with the [1], we can potentially
deserialize the
UDF in the Python process calling the Python consumer API (likely to be the
user Python process), call PyArrow API to register the UDF, (maybe with a
temporary
name with UUID to avoid conflict) then construct a UDF ExecNode with that
name?
I think this would not but not sure if I missed anything. I think a second
registry would
help and be more clean, but with the current MVP purpose the temporary name
+ unregister
from destructor probably works.

On Mon, May 16, 2022 at 9:53 PM Weston Pace <we...@gmail.com> wrote:

> 1. This is probably best discussed on the Ibis mailing list (or
> Github?  I'm not sure how the Ibis community communicates).
>
> 2. If we consider "encoding the embedded function in Substrait" to be
> part of (2) then this can be discussed in the Substrait community.
> That being said, in the sync call last week we had a brief discussion
> and to start with the EmbeddedFunction message and add an "any"
> version and then propose a PR to standardize it.  We will need to find
> some spot to encode the embedded function into the plan itself
> however.
>
> 3. A PR is in progress (pretty close to merging) to expose the
> Substrait consumer to python.  I think this will be good enough for
> MVP purposes.  Check out [1].
>
> > we are not sure how to proceed with (3), so wondering how to
> > register/execute the deserialize the UDF from substrait and execute it
> > using existing UDF code. Any thoughts?
>
> Good questions.  I haven't given this a whole lot of thought yet so
> take these suggestions with a grain of salt.  Probably the minimal
> code change approach would be to implement [2] and call the unregister
> from the ExecPlan's destructor.  However, this is going to eventually
> lead to naming conflicts if multiple UDFs are running in parallel.  We
> could generate unique names to work around this.  We could also
> consider a second function registry, unique to the plan, combined with
> the master function registry via some kind of facade registry that
> prefer's local UDFs and falls back to process UDFs.  I think that
> might be the best long term solution.
>
> [1] https://github.com/apache/arrow/pull/12672
> [2] https://issues.apache.org/jira/browse/ARROW-16211
>
> On Mon, May 16, 2022 at 5:06 AM Li Jin <ic...@gmail.com> wrote:
> >
> > For context, this is a continuation of a previous email discussion: "RPC:
> > Out of Process Python UDFs in Arrow Compute" where we identified
> reasonable
> > steps to solve are:
> > ostef Process Python UDFs
> > (1) A way to serialize the user function (and function parameters) (in
> the
> > doc I proposed to use cloudpickle based approach similar to how Spark
> does
> > it) (2) Add a Substrait relation/expression that captures the the UDF (3)
> > Deserialize the Substrait relation/expression in Arrow compute and
> execute
> > the UDF (either using the approach in the current Scalar UDF prototype or
> > do sth else)
> > (Same as the Yaron layout above).
> >
> > Now I think we have reasonable solutions are (1) and (2) (at least for
> PoC
> > purpose), but we are not sure how to proceed with (3), so wondering how
> to
> > register/execute the deserialize the UDF from substrait and execute it
> > using existing UDF code. Any thoughts?
> >
> > Li
> >
> >
> > On Sun, May 15, 2022 at 8:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'm working on a Python UDFs PoC and would like to get the community's
> > > feedback on its design.
> > >
> > > The goal of this PoC is to enable a user to integrate Python UDFs in an
> > > Ibis/Substrait/Arrow workflow. The basic idea is that the user would
> create
> > > an Ibis expression that includes Python UDFs implemented by the user,
> then
> > > use a single invocation to:
> > >
> > >   1.  produce a Substrait plan for the Ibis expression;
> > >   2.  deliver the Substrait plan to Arrow;
> > >   3.  consume the Substrait plan in Arrow to obtain an execution plan;
> > >   4.  run the execution plan;
> > >   5.  deliver back the resulting table outputs of the plan.
> > >
> > > I already have the above steps implemented, with support for some
> existing
> > > execution nodes and compute functions in Arrow, but without the
> support for
> > > Python UDFs, which is the focus of this discussion. To add this
> support I
> > > am thinking about a design where:
> > >
> > >   *   Ibis is extended to support an expression of type Python UDF,
> which
> > > carries (text-form) serialized UDF code (likely using cloudpickle)
> > >   *   Ibis-Substrait is extended to support an extension function of
> type
> > > Python UDF, which also carries (text-within-protobuf-form) serialized
> UDF
> > > code
> > >   *   PyArrow is extended to process a Substrait plan that includes
> > > serialized Python UDFs using these steps:
> > >      *   Deserialize the Python UDFs
> > >      *   Register the Python UDFs (preferably, in a local scope that is
> > > cleared after processing ends)
> > >      *   Run the execution plan corresponding to the Substrait plan
> > >      *   Return the resulting table outputs of the plan
> > >
> > > Note that this design requires the user to drive PyArrow. It does not
> > > support driving Arrow C++ to process such a Substrait plan (it does not
> > > start an embedded Python interpreter).
> > >
> > >
> > > Cheers,
> > > Yaron.
> > >
>

Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow

Posted by Li Jin <ic...@gmail.com>.
Thanks Weston.

> Good questions.  I haven't given this a whole lot of thought yet so
> take these suggestions with a grain of salt.  Probably the minimal
> code change approach would be to implement [2] and call the unregister
> from the ExecPlan's destructor.  However, this is going to eventually
> lead to naming conflicts if multiple UDFs are running in parallel.  We
> could generate unique names to work around this.  We could also
> consider a second function registry, unique to the plan, combined with
> the master function registry via some kind of facade registry that
> prefer's local UDFs and falls back to process UDFs.  I think that
> might be the best long term solution.

If I understand this correctly, with the [1], we can potentially
deserialize the
UDF in the Python process calling the Python consumer API (likely to be the
user Python process), call PyArrow API to register the UDF, (maybe with a
temporary
name with UUID to avoid conflict) then construct a UDF ExecNode with that
name?
I think this would not but not sure if I missed anything. I think a second
registry would
help and be more clean, but with the current MVP purpose the temporary name
+ unregister
from destructor probably works.

On Mon, May 16, 2022 at 9:53 PM Weston Pace <we...@gmail.com> wrote:

> 1. This is probably best discussed on the Ibis mailing list (or
> Github?  I'm not sure how the Ibis community communicates).
>
> 2. If we consider "encoding the embedded function in Substrait" to be
> part of (2) then this can be discussed in the Substrait community.
> That being said, in the sync call last week we had a brief discussion
> and to start with the EmbeddedFunction message and add an "any"
> version and then propose a PR to standardize it.  We will need to find
> some spot to encode the embedded function into the plan itself
> however.
>
> 3. A PR is in progress (pretty close to merging) to expose the
> Substrait consumer to python.  I think this will be good enough for
> MVP purposes.  Check out [1].
>
> > we are not sure how to proceed with (3), so wondering how to
> > register/execute the deserialize the UDF from substrait and execute it
> > using existing UDF code. Any thoughts?
>
> Good questions.  I haven't given this a whole lot of thought yet so
> take these suggestions with a grain of salt.  Probably the minimal
> code change approach would be to implement [2] and call the unregister
> from the ExecPlan's destructor.  However, this is going to eventually
> lead to naming conflicts if multiple UDFs are running in parallel.  We
> could generate unique names to work around this.  We could also
> consider a second function registry, unique to the plan, combined with
> the master function registry via some kind of facade registry that
> prefer's local UDFs and falls back to process UDFs.  I think that
> might be the best long term solution.
>
> [1] https://github.com/apache/arrow/pull/12672
> [2] https://issues.apache.org/jira/browse/ARROW-16211
>
> On Mon, May 16, 2022 at 5:06 AM Li Jin <ic...@gmail.com> wrote:
> >
> > For context, this is a continuation of a previous email discussion: "RPC:
> > Out of Process Python UDFs in Arrow Compute" where we identified
> reasonable
> > steps to solve are:
> > ostef Process Python UDFs
> > (1) A way to serialize the user function (and function parameters) (in
> the
> > doc I proposed to use cloudpickle based approach similar to how Spark
> does
> > it) (2) Add a Substrait relation/expression that captures the the UDF (3)
> > Deserialize the Substrait relation/expression in Arrow compute and
> execute
> > the UDF (either using the approach in the current Scalar UDF prototype or
> > do sth else)
> > (Same as the Yaron layout above).
> >
> > Now I think we have reasonable solutions are (1) and (2) (at least for
> PoC
> > purpose), but we are not sure how to proceed with (3), so wondering how
> to
> > register/execute the deserialize the UDF from substrait and execute it
> > using existing UDF code. Any thoughts?
> >
> > Li
> >
> >
> > On Sun, May 15, 2022 at 8:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'm working on a Python UDFs PoC and would like to get the community's
> > > feedback on its design.
> > >
> > > The goal of this PoC is to enable a user to integrate Python UDFs in an
> > > Ibis/Substrait/Arrow workflow. The basic idea is that the user would
> create
> > > an Ibis expression that includes Python UDFs implemented by the user,
> then
> > > use a single invocation to:
> > >
> > >   1.  produce a Substrait plan for the Ibis expression;
> > >   2.  deliver the Substrait plan to Arrow;
> > >   3.  consume the Substrait plan in Arrow to obtain an execution plan;
> > >   4.  run the execution plan;
> > >   5.  deliver back the resulting table outputs of the plan.
> > >
> > > I already have the above steps implemented, with support for some
> existing
> > > execution nodes and compute functions in Arrow, but without the
> support for
> > > Python UDFs, which is the focus of this discussion. To add this
> support I
> > > am thinking about a design where:
> > >
> > >   *   Ibis is extended to support an expression of type Python UDF,
> which
> > > carries (text-form) serialized UDF code (likely using cloudpickle)
> > >   *   Ibis-Substrait is extended to support an extension function of
> type
> > > Python UDF, which also carries (text-within-protobuf-form) serialized
> UDF
> > > code
> > >   *   PyArrow is extended to process a Substrait plan that includes
> > > serialized Python UDFs using these steps:
> > >      *   Deserialize the Python UDFs
> > >      *   Register the Python UDFs (preferably, in a local scope that is
> > > cleared after processing ends)
> > >      *   Run the execution plan corresponding to the Substrait plan
> > >      *   Return the resulting table outputs of the plan
> > >
> > > Note that this design requires the user to drive PyArrow. It does not
> > > support driving Arrow C++ to process such a Substrait plan (it does not
> > > start an embedded Python interpreter).
> > >
> > >
> > > Cheers,
> > > Yaron.
> > >
>

Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow

Posted by Weston Pace <we...@gmail.com>.
1. This is probably best discussed on the Ibis mailing list (or
Github?  I'm not sure how the Ibis community communicates).

2. If we consider "encoding the embedded function in Substrait" to be
part of (2) then this can be discussed in the Substrait community.
That being said, in the sync call last week we had a brief discussion
and to start with the EmbeddedFunction message and add an "any"
version and then propose a PR to standardize it.  We will need to find
some spot to encode the embedded function into the plan itself
however.

3. A PR is in progress (pretty close to merging) to expose the
Substrait consumer to python.  I think this will be good enough for
MVP purposes.  Check out [1].

> we are not sure how to proceed with (3), so wondering how to
> register/execute the deserialize the UDF from substrait and execute it
> using existing UDF code. Any thoughts?

Good questions.  I haven't given this a whole lot of thought yet so
take these suggestions with a grain of salt.  Probably the minimal
code change approach would be to implement [2] and call the unregister
from the ExecPlan's destructor.  However, this is going to eventually
lead to naming conflicts if multiple UDFs are running in parallel.  We
could generate unique names to work around this.  We could also
consider a second function registry, unique to the plan, combined with
the master function registry via some kind of facade registry that
prefer's local UDFs and falls back to process UDFs.  I think that
might be the best long term solution.

[1] https://github.com/apache/arrow/pull/12672
[2] https://issues.apache.org/jira/browse/ARROW-16211

On Mon, May 16, 2022 at 5:06 AM Li Jin <ic...@gmail.com> wrote:
>
> For context, this is a continuation of a previous email discussion: "RPC:
> Out of Process Python UDFs in Arrow Compute" where we identified reasonable
> steps to solve are:
> ostef Process Python UDFs
> (1) A way to serialize the user function (and function parameters) (in the
> doc I proposed to use cloudpickle based approach similar to how Spark does
> it) (2) Add a Substrait relation/expression that captures the the UDF (3)
> Deserialize the Substrait relation/expression in Arrow compute and execute
> the UDF (either using the approach in the current Scalar UDF prototype or
> do sth else)
> (Same as the Yaron layout above).
>
> Now I think we have reasonable solutions are (1) and (2) (at least for PoC
> purpose), but we are not sure how to proceed with (3), so wondering how to
> register/execute the deserialize the UDF from substrait and execute it
> using existing UDF code. Any thoughts?
>
> Li
>
>
> On Sun, May 15, 2022 at 8:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > Hi,
> >
> > I'm working on a Python UDFs PoC and would like to get the community's
> > feedback on its design.
> >
> > The goal of this PoC is to enable a user to integrate Python UDFs in an
> > Ibis/Substrait/Arrow workflow. The basic idea is that the user would create
> > an Ibis expression that includes Python UDFs implemented by the user, then
> > use a single invocation to:
> >
> >   1.  produce a Substrait plan for the Ibis expression;
> >   2.  deliver the Substrait plan to Arrow;
> >   3.  consume the Substrait plan in Arrow to obtain an execution plan;
> >   4.  run the execution plan;
> >   5.  deliver back the resulting table outputs of the plan.
> >
> > I already have the above steps implemented, with support for some existing
> > execution nodes and compute functions in Arrow, but without the support for
> > Python UDFs, which is the focus of this discussion. To add this support I
> > am thinking about a design where:
> >
> >   *   Ibis is extended to support an expression of type Python UDF, which
> > carries (text-form) serialized UDF code (likely using cloudpickle)
> >   *   Ibis-Substrait is extended to support an extension function of type
> > Python UDF, which also carries (text-within-protobuf-form) serialized UDF
> > code
> >   *   PyArrow is extended to process a Substrait plan that includes
> > serialized Python UDFs using these steps:
> >      *   Deserialize the Python UDFs
> >      *   Register the Python UDFs (preferably, in a local scope that is
> > cleared after processing ends)
> >      *   Run the execution plan corresponding to the Substrait plan
> >      *   Return the resulting table outputs of the plan
> >
> > Note that this design requires the user to drive PyArrow. It does not
> > support driving Arrow C++ to process such a Substrait plan (it does not
> > start an embedded Python interpreter).
> >
> >
> > Cheers,
> > Yaron.
> >

Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow

Posted by Li Jin <ic...@gmail.com>.
For context, this is a continuation of a previous email discussion: "RPC:
Out of Process Python UDFs in Arrow Compute" where we identified reasonable
steps to solve are:
ostef Process Python UDFs
(1) A way to serialize the user function (and function parameters) (in the
doc I proposed to use cloudpickle based approach similar to how Spark does
it) (2) Add a Substrait relation/expression that captures the the UDF (3)
Deserialize the Substrait relation/expression in Arrow compute and execute
the UDF (either using the approach in the current Scalar UDF prototype or
do sth else)
(Same as the Yaron layout above).

Now I think we have reasonable solutions are (1) and (2) (at least for PoC
purpose), but we are not sure how to proceed with (3), so wondering how to
register/execute the deserialize the UDF from substrait and execute it
using existing UDF code. Any thoughts?

Li


On Sun, May 15, 2022 at 8:02 AM Yaron Gvili <rt...@hotmail.com> wrote:

> Hi,
>
> I'm working on a Python UDFs PoC and would like to get the community's
> feedback on its design.
>
> The goal of this PoC is to enable a user to integrate Python UDFs in an
> Ibis/Substrait/Arrow workflow. The basic idea is that the user would create
> an Ibis expression that includes Python UDFs implemented by the user, then
> use a single invocation to:
>
>   1.  produce a Substrait plan for the Ibis expression;
>   2.  deliver the Substrait plan to Arrow;
>   3.  consume the Substrait plan in Arrow to obtain an execution plan;
>   4.  run the execution plan;
>   5.  deliver back the resulting table outputs of the plan.
>
> I already have the above steps implemented, with support for some existing
> execution nodes and compute functions in Arrow, but without the support for
> Python UDFs, which is the focus of this discussion. To add this support I
> am thinking about a design where:
>
>   *   Ibis is extended to support an expression of type Python UDF, which
> carries (text-form) serialized UDF code (likely using cloudpickle)
>   *   Ibis-Substrait is extended to support an extension function of type
> Python UDF, which also carries (text-within-protobuf-form) serialized UDF
> code
>   *   PyArrow is extended to process a Substrait plan that includes
> serialized Python UDFs using these steps:
>      *   Deserialize the Python UDFs
>      *   Register the Python UDFs (preferably, in a local scope that is
> cleared after processing ends)
>      *   Run the execution plan corresponding to the Substrait plan
>      *   Return the resulting table outputs of the plan
>
> Note that this design requires the user to drive PyArrow. It does not
> support driving Arrow C++ to process such a Substrait plan (it does not
> start an embedded Python interpreter).
>
>
> Cheers,
> Yaron.
>