You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Yaron Gvili <rt...@hotmail.com> on 2022/06/03 15:31:33 UTC

data-source UDFs

Hi,

I'm working on support for data-source UDFs and would like to get feedback about the design I have in mind for it.

By support for data-source UDFs, at a basic level, I mean enabling a user to define using PyArrow APIs a record-batch-generating function implemented in Python that would be easily plugged into a source-node in a streaming-engine execution plan. Such functions are similar to the existing scalar UDFs with zero inputs, but an important difference is that scalar UDFs are plugged and composed in expressions whereas data-source UDFs would be plugged into a source-node.

Focusing on the Arrow and PyArrow parts (I'm leaving the Ibis and Ibis-Substrait parts out), the design I have in mind includes:

  *   In Arrow: Adding a new source-UDF kind of arrow::compute::Function, for functions that generate data. Such functions would be registered in a FunctionRegistry but not used in scalar expressions nor composed.
  *   In Arrow: Adding SourceUdfContext and SourceUdfOptions (similar to ScalarUdfContext and ScalarUdfOptions) in "cpp/src/arrow/python/udf.h".
  *   In Arrow: Adding a UdfSourceExecNode into which a (source-UDF-kind of) function can be plugged.
  *   In PyArrow: Following the design of scalar UDFs, and hopefully reusing much of it.

Cheers,
Yaron.

Re: data-source UDFs

Posted by Vibhatha Abeykoon <vi...@gmail.com>.
This is a nice discussion and would be great to have a doc as David
suggested. Also I think this would help us to explore the usability piece
of Python UDFs too. May be we can start documenting and go for a PoC and
iteratively go for an implementation.

On Tue, Jun 7, 2022 at 12:08 AM Weston Pace <we...@gmail.com> wrote:

> That makes sense to me.  These are my (perhaps naive) first thoughts
> at the shape such an implementation might take...
>
> * The function returns an iterator or a context manager that returns an
> iterator
> * If a context manager then __enter__ will be called when the engine
> is ready to start receiving data from that source
> * Each call to `next()` will be made from a new I/O thread task so the
> call is free to block until data has arrived
> * The engine will not make reentrant calls to `next()`.  If the source
> wants to do readahead it needs to manage that on its own.
> * At some point the engine will be finished
>   * If a context manager then __exit__ will be called.  Otherwise the
> engine will just drop the reference so the source is eligible for gc
>   * This might happen before the iterator is exhausted (user requested
> cancellation)
>   * This cleanup will happen in the destructor of the source node
>   * The call to __exit__ should not return until all files are closed,
> resources released, etc.
>
> This should be doable with the current source node and building a
> special async generator for these python sources.  The Substrait
> consumer would just need to instantiate this generator.  The function
> registry wouldn't need to be involved although I could imagine
> implementations in which it is.
>
> I don't know if your producers and consumers live on the same server
> or not.  If they do not it could be a little tricky for users to test
> / develop these sources since they probably depend on
> filesystems/connections/etc. that don't exist on the producer.
>
> I don't know if "context manager that returns an iterator" is a
> regular enough python concept to be intuitive or if it would be better
> to invent a new class at that point.  The key lifecycle moments are:
>
>  * Start - Called when the engine is ready.  Today this will be called
> as soon as the engine starts but there has been discussion that, in
> the future, we might want to delay this.  For example, if connected to
> a hash-join node you might not want to start reading the probe side
> immediately and instead focus on the build side.  I don't know if this
> will ever happen but it's probably better to plan for it.  Even in the
> case where we call it at plan start it would be useful to have this in
> case we create a plan and start it much later.
>  * Next - Simple (safe to block) method to get the next batch of data
>  * Finish - Called to cleanup, potentially called before the source is
> full read in the case of cancellation
>
> On Sun, Jun 5, 2022 at 9:54 PM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Any C++ extension point could become a pyarrow extension point if
> desired.
> >
> > Agreed. Still, the listed extension points are for supporting an
> implementation, not integration. The existing integration code for a Scalar
> UDF written in Python (See https://github.com/apache/arrow/pull/12590) is
> the kind of code I have in mind for supporting integration.
> >
> > > I have a few questions then.
> >
> > Before I answer each question, here's a wider view of the design I have
> in mind. The (Python code of the) data-source UDF is intended to be
> serialized and embedded in a Substrait plan (I already have this
> serialization and embedding implemented locally - PR to come). The UDF
> has/needs no arguments because its code is specific to the plan. The UDF
> returns a new stream/generator of RecordBatch instances to be used by the
> source-node in each execution of the plan. The UDF itself is made by a
> UDF-maker that does get arguments, which are used to derive values that are
> fixed into the UDF being made.
> >
> > To your questions in order:
> >
> >   1.  The data-source UDF is stateless and takes no arguments. The
> UDF-maker does takes arguments while the RecordBatch stream/generator has
> state.
> >   2.  The data-source UDF does not need any information from the
> execution plan; it does exactly the same thing in each execution - return a
> new RecordBatch stream/generator for the same data. Only the source-node
> that holds the data-source UDF interacts with the execution plan.
> >   3.  The data-source UDF is fixed with (not receives) values derived by
> the UDF-maker from its arguments. A path can certainly be included in these
> values. Functions like "Close" may be implemented in the RecordBatch
> stream/generator.
> >   4.  Using the function registry is actually not a critical part of the
> design I'm proposing, which is one reason I asked about it. It just seemed
> to me as a reasonable place for the execution plan to find the data-source
> UDF after it gets deserialized from the Substrait plan. If the function
> registry is not used, then the design would need some other registry or
> mechanism for passing the deserialized data source-UDF to the execution
> plan.
> >   5.  The data-source UDF is specific to an execution plan, so
> definitely specific to the user who created the Substrait plan in which it
> is embedded. Users (or perhaps authors) can share data-source-UDF-makers,
> or libraries thereof, for general purpose.
> >
> > Yaron.
> > ________________________________
> > From: Weston Pace <we...@gmail.com>
> > Sent: Saturday, June 4, 2022 2:41 PM
> > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > Subject: Re: data-source UDFs
> >
> > > The former is about facilities (like extension points) for
> implementing custom data sources in Arrow whereas the latter is about
> facilities for integrating in PyArrow (existing or future) data sources
> written/wrapped in Python
> >
> > Any C++ extension point could become a pyarrow extension point if
> > desired.  For example, we already have a pure-python filesystem
> > adapter if a user wants to implement a filesystem purely in python
> > (this is how the fsspec adapter works I think).
> >
> > > I'm especially interested in feedback about the new function-kind and
> extensions of "cpp/src/arrow/python/udf.h" I proposed
> >
> > I have a few questions then.
> >
> > You mentioned that these functions would take no arguments.  Li's
> > rough example showed a path being provided to the UDF.
> >
> >   1. If the UDF takes no arguments then I will assume it must be
> > stateful.  That doesn't really match with the current idea of the
> > function registry.  Although it may be a problem we need to someday
> > solve for aggregate UDFs.  What would you imagine the lifecycle is for
> > this function?  If it is stateful would it not be more accurate to
> > call it something other than a function?
> >   2. If the UDF receives no arguments then do you need to communicate
> > any part of the execution plan (e.g. the Substrait plan) to the UDF?
> > How would you envision this happening?  Or is this a source that
> > always does the exact same thing regardless of the execution plan
> > (this could be valid if, for example, the user configures the source
> > with a plan independent of the execution plan before they register
> > it)?
> >   3. If the UDF is not stateful then it receives some arguments.  What
> > are they?  Does the UDF receive a path?  Or does the UDF generate
> > paths?  Are there two different kinds of UDFs, ones that receive paths
> > and ones that generate paths?
> >
> > Most of the extension points I described have more than one method.
> > Even in the APIs where I only listed one method there are some utility
> > methods like `Close` which I didn't list for simplicity.  While you
> > can generally represent a class as a "bag of functions" (a stateful
> > class is a "bag of functions that takes state as the first argument")
> > I find it to be more difficult for users to follow than just
> > registering a type with a defined interface.
> >
> >   4. Is there a particular reason in your use case for using the
> > function registry for this?
> >   5. Do you imagine these UDFs would always be specific to particular
> > users?  Or would it be possible for such a UDF to be shared as a
> > general purpose utility?
> >
> > On Sat, Jun 4, 2022 at 3:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
> > >
> > > Thanks for the detailed overview, Weston. I agree with David this
> would be very useful to have in a public doc.
> > >
> > > Weston and David's discussion is a good one, however, I see it as
> separate from the discussion I brought up. The former is about facilities
> (like extension points) for implementing custom data sources in Arrow
> whereas the latter is about facilities for integrating in PyArrow (existing
> or future) data sources written/wrapped in Python. In this latter
> discussion, I'm indifferent to the complexities of data source
> implementation. I'm especially interested in feedback about the new
> function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as
> well as possible alternatives to these, and more generally in reaching
> consensus about how a custom data-source written/wrapped in Python would
> get integrated.
> > >
> > > > > At the moment as we
> > > > are not exposing the execution engine primitives to Python user, are
> you
> > > > expecting to expose them by this approach.
> > > >
> > > > From our side, these APIs are not directly exposed to the end user,
> but
> > > > rather, primitives that allow us to build on top of.
> > >
> > > For clarity of discussion, I'd suggest distinguishing between a
> data-source-integrator and an Acero-user (or end-user), since in many use
> cases these are not the same person. When I wrote user, I meant a
> data-source-integrator. An Acero-user would not be directly using the
> facilities I proposed.
> > >
> > >
> > > Yaron.
> > > ________________________________
> > > From: David Li <li...@apache.org>
> > > Sent: Friday, June 3, 2022 5:53 PM
> > > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > > Subject: Re: data-source UDFs
> > >
> > > Thanks for the overview of the different extension points, it's nice
> to see this laid out. (It would be great to find a place in the docs for
> this, IMO, or possibly as a blog post?)
> > >
> > > Just to chime in quickly here:
> > >
> > > For databases/Flight, my hope is that integrating ADBC into Arrow
> Datasets will take care of both. Plain Flight isn't quite well-defined
> enough to be meaningfully integrated (except perhaps via a generic "stream
> of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an
> ExecPlan, we'd have to do some work that would look roughly like writing an
> ADBC driver, so we may as well go that route.
> > >
> > > -David
> > >
> > > On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> > > > Efficiently reading from a data source is something that has a bit of
> > > > complexity (parsing files, connecting to remote data sources,
> managing
> > > > parallel reads, etc.)  Ideally we don't want users to have to
> reinvent
> > > > these things as they go.  The datasets module in Arrow-C++ has a lot
> > > > of code here already.
> > > >
> > > > So I think there is probably more than one extension point.  Some of
> > > > these extension points already exist.  I do believe there is
> > > > opportunity to create further extension points as well and the
> > > > challenge / opportunity here will be figuring out what those are and
> > > > what their API should be.
> > > >
> > > > ## I can describe a little bit about what we have already:
> > > >
> > > >  * Filesystem abstraction
> > > >
> > > > Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> > > > which is pretty well documented and straightforward.  This is how we
> > > > can swap between local disk, S3, etc.  From an Acero / datasets
> > > > perspective the API is basically "given a path, give me a stream of
> > > > bytes" (open file) and "given a path, give me a list of files" (list
> > > > directory).
> > > >
> > > >  * FileFormat abstraction
> > > >
> > > > The file format abstraction (arrow::dataset::FileFormat) is how we
> > > > swap out different kinds of files.  For example,
> > > > arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> > > > plenty of details)
> > > >
> > > >  - Convert input file to schema (inspect)
> > > >  - Convert input file to a stream of batches (scan)
> > > >  - Convert a stream of batches to an output file (write)
> > > >
> > > >  * Fragment / Dataset abstraction
> > > >
> > > > The fragment (arrow::dataset::Fragment) & dataset
> > > > (arrow::dataset::Dataset) APIs are how we describe a collection of
> > > > files.  This is used by the scanner to implement parallel reads.  You
> > > > can think of these as the "source" API.  The APIs are roughly
> > > >
> > > >  - Convert a dataset to a stream fragments (list dataset)
> > > >  - Convert a fragment to a stream of batches (scan)
> > > >
> > > > The two main implementations of datasets that we have today are
> > > > FilesystemDataset (uses a filesystem to list files, each file is a
> > > > fragment.  A filesystem fragment uses a format to convert its file to
> > > > a stream of batches) and the InMemoryDataset (there is one fragment
> > > > and the scan operation is just slicing off pieces of the in-memory
> > > > data).  There are also some niche implementations here like a dataset
> > > > that is created from a python iterable of batches.  This might be
> very
> > > > similar to what you are describing above.
> > > >
> > > > A dataset must be created with a "dataset schema" which is the single
> > > > schema that all fragments of the dataset can be converted to.
> > > >
> > > > * Custom source nodes
> > > >
> > > > All of the above is exposed to Acero via the scan node which is
> > > > responsible for turning a dataset into Acero input.  However, the
> > > > datasets API could be bypassed entirely to feed Acero in other ways.
> > > > Some examples:
> > > >
> > > >  - The table source node is a way to feed in-memory data (a table)
> > > > into Acero.  This is very similar to the InMemoryDataset but bypasses
> > > > some of the overhead of the scanner.
> > > >  - The TCP-H source node generates random data for benchmarking
> purposes.
> > > >
> > > > A lot of things can be expressed both as a simple dataset or a custom
> > > > source node.  There is a bit of duplication here and I don't know
> that
> > > > it matters too much.  I'm just pointing this out for pedantic
> > > > purposes.
> > > >
> > > > The API here is just the ExecNode API and so the user needs to
> provide
> > > > something that starts when StartProducing is called and then calls
> > > > InputReceived on a regular basis.  From the discussions on the
> > > > scheduler I suspect this API may be changing slightly but the idea is
> > > > still there.
> > > >
> > > > ## I'm also aware of a number of things we are still going to need at
> > > > some point.
> > > >
> > > >  * Evolution
> > > >
> > > > Sometimes different files in a dataset have different schemas.  A
> very
> > > > common case is fields getting added over time or fields getting
> > > > renamed or changing data type (e.g. int32 -> int64).  We have some
> > > > support for the former but none for the latter.  I've got a pretty
> > > > good idea of what the API looks like for "evolution" so if that is
> > > > something needed I could write that up.
> > > >
> > > >  * Flight dataset/fragment/source-node?
> > > >
> > > > I don't think there will be exactly a "FlightFragment".  Maybe the
> > > > right term is ADBC, but I haven't been following that discussion
> > > > closely enough.  There needs to be a way to scan a remote data source
> > > > that provides its data via a standard flight service.
> > > >
> > > >  * Sql dataset/fragment/source-node?
> > > >
> > > > It could be very useful to have a dataset that is capable of reading
> > > > data from SQL datasets via something like JDBC (although, if ADBC
> > > > connectors get built quickly enough, maybe this is never needed :)
> > > >
> > > >  * Catalogs
> > > >
> > > > The filesystem dataset currently figures out the dataset schema
> > > > through a rather expensive inspection process and it lists its
> > > > fragments using potentially expensive directory listing.  Metadata
> > > > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> > > > of storing precomputed versions of this information.  A dataset that
> > > > is capable of figuring out what files to scan from a catalog would be
> > > > valuable.  This dataset might use the same filesystem fragment to do
> > > > the actual scan.
> > > >
> > > >  * Table metadata
> > > >
> > > > Very similar to the catalogs discussion is the idea of "table
> > > > metadata".  This is less about reading data and more about describing
> > > > the data.  For example, metadata about any ordering of the incoming
> > > > data, unique constraints, not-null constraints, etc.  All of this
> > > > information can be used by exec nodes to simplify query processing.
> > > > For example, if you are grouping on a set of keys and one of the keys
> > > > is ordered then you implement group by with a streaming (not pipeline
> > > > breaking) implementation.
> > > >
> > > >> that allows utilizing existing Python APIs that knows how to read
> data
> > > >> source as a stream of record batches.
> > > >
> > > > We have a class called arrow::dataset::<unnamed>::OneShotFragment
> > > > which knows how to convert a python iterator into a scannable source.
> > > > This might serve your needs.  This was also written when things were
> > > > more dataset-oriented.  It might also be interesting to create a
> > > > python source node which does the same sort of thing, bypassing the
> > > > scanner, although I don't know that there would be much concrete
> > > > benefit.
> > > >
> > > > I hope this information is helpful!  It is just background though.  I
> > > > think I might need to understand your needs in a bit more detail
> > > > before I can offer any kind of prescriptive advice.
> > > >
> > > > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
> > > >>
> > > >> Actually, "UDF" might be the wrong terminology here - This is more
> of a
> > > >> "custom Python data source" than "Python user defined functions".
> (Although
> > > >> under the hood it can probably reuse lots of the UDF logic to
> execute the
> > > >> custom data source)
> > > >>
> > > >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com>
> wrote:
> > > >>
> > > >> > What Yaron is going for is really something similar to custom
> data source
> > > >> > in Spark (
> > > >> >
> https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a
> )
> > > >> > that allows utilizing existing Python APIs that knows how to read
> data
> > > >> > source as a stream of record batches.
> > > >> >
> > > >> >
> > > >> >
>
-- 
Vibhatha Abeykoon

Re: data-source UDFs

Posted by Weston Pace <we...@gmail.com>.
That makes sense to me.  These are my (perhaps naive) first thoughts
at the shape such an implementation might take...

* The function returns an iterator or a context manager that returns an iterator
* If a context manager then __enter__ will be called when the engine
is ready to start receiving data from that source
* Each call to `next()` will be made from a new I/O thread task so the
call is free to block until data has arrived
* The engine will not make reentrant calls to `next()`.  If the source
wants to do readahead it needs to manage that on its own.
* At some point the engine will be finished
  * If a context manager then __exit__ will be called.  Otherwise the
engine will just drop the reference so the source is eligible for gc
  * This might happen before the iterator is exhausted (user requested
cancellation)
  * This cleanup will happen in the destructor of the source node
  * The call to __exit__ should not return until all files are closed,
resources released, etc.

This should be doable with the current source node and building a
special async generator for these python sources.  The Substrait
consumer would just need to instantiate this generator.  The function
registry wouldn't need to be involved although I could imagine
implementations in which it is.

I don't know if your producers and consumers live on the same server
or not.  If they do not it could be a little tricky for users to test
/ develop these sources since they probably depend on
filesystems/connections/etc. that don't exist on the producer.

I don't know if "context manager that returns an iterator" is a
regular enough python concept to be intuitive or if it would be better
to invent a new class at that point.  The key lifecycle moments are:

 * Start - Called when the engine is ready.  Today this will be called
as soon as the engine starts but there has been discussion that, in
the future, we might want to delay this.  For example, if connected to
a hash-join node you might not want to start reading the probe side
immediately and instead focus on the build side.  I don't know if this
will ever happen but it's probably better to plan for it.  Even in the
case where we call it at plan start it would be useful to have this in
case we create a plan and start it much later.
 * Next - Simple (safe to block) method to get the next batch of data
 * Finish - Called to cleanup, potentially called before the source is
full read in the case of cancellation

On Sun, Jun 5, 2022 at 9:54 PM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > Any C++ extension point could become a pyarrow extension point if desired.
>
> Agreed. Still, the listed extension points are for supporting an implementation, not integration. The existing integration code for a Scalar UDF written in Python (See https://github.com/apache/arrow/pull/12590) is the kind of code I have in mind for supporting integration.
>
> > I have a few questions then.
>
> Before I answer each question, here's a wider view of the design I have in mind. The (Python code of the) data-source UDF is intended to be serialized and embedded in a Substrait plan (I already have this serialization and embedding implemented locally - PR to come). The UDF has/needs no arguments because its code is specific to the plan. The UDF returns a new stream/generator of RecordBatch instances to be used by the source-node in each execution of the plan. The UDF itself is made by a UDF-maker that does get arguments, which are used to derive values that are fixed into the UDF being made.
>
> To your questions in order:
>
>   1.  The data-source UDF is stateless and takes no arguments. The UDF-maker does takes arguments while the RecordBatch stream/generator has state.
>   2.  The data-source UDF does not need any information from the execution plan; it does exactly the same thing in each execution - return a new RecordBatch stream/generator for the same data. Only the source-node that holds the data-source UDF interacts with the execution plan.
>   3.  The data-source UDF is fixed with (not receives) values derived by the UDF-maker from its arguments. A path can certainly be included in these values. Functions like "Close" may be implemented in the RecordBatch stream/generator.
>   4.  Using the function registry is actually not a critical part of the design I'm proposing, which is one reason I asked about it. It just seemed to me as a reasonable place for the execution plan to find the data-source UDF after it gets deserialized from the Substrait plan. If the function registry is not used, then the design would need some other registry or mechanism for passing the deserialized data source-UDF to the execution plan.
>   5.  The data-source UDF is specific to an execution plan, so definitely specific to the user who created the Substrait plan in which it is embedded. Users (or perhaps authors) can share data-source-UDF-makers, or libraries thereof, for general purpose.
>
> Yaron.
> ________________________________
> From: Weston Pace <we...@gmail.com>
> Sent: Saturday, June 4, 2022 2:41 PM
> To: dev@arrow.apache.org <de...@arrow.apache.org>
> Subject: Re: data-source UDFs
>
> > The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python
>
> Any C++ extension point could become a pyarrow extension point if
> desired.  For example, we already have a pure-python filesystem
> adapter if a user wants to implement a filesystem purely in python
> (this is how the fsspec adapter works I think).
>
> > I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed
>
> I have a few questions then.
>
> You mentioned that these functions would take no arguments.  Li's
> rough example showed a path being provided to the UDF.
>
>   1. If the UDF takes no arguments then I will assume it must be
> stateful.  That doesn't really match with the current idea of the
> function registry.  Although it may be a problem we need to someday
> solve for aggregate UDFs.  What would you imagine the lifecycle is for
> this function?  If it is stateful would it not be more accurate to
> call it something other than a function?
>   2. If the UDF receives no arguments then do you need to communicate
> any part of the execution plan (e.g. the Substrait plan) to the UDF?
> How would you envision this happening?  Or is this a source that
> always does the exact same thing regardless of the execution plan
> (this could be valid if, for example, the user configures the source
> with a plan independent of the execution plan before they register
> it)?
>   3. If the UDF is not stateful then it receives some arguments.  What
> are they?  Does the UDF receive a path?  Or does the UDF generate
> paths?  Are there two different kinds of UDFs, ones that receive paths
> and ones that generate paths?
>
> Most of the extension points I described have more than one method.
> Even in the APIs where I only listed one method there are some utility
> methods like `Close` which I didn't list for simplicity.  While you
> can generally represent a class as a "bag of functions" (a stateful
> class is a "bag of functions that takes state as the first argument")
> I find it to be more difficult for users to follow than just
> registering a type with a defined interface.
>
>   4. Is there a particular reason in your use case for using the
> function registry for this?
>   5. Do you imagine these UDFs would always be specific to particular
> users?  Or would it be possible for such a UDF to be shared as a
> general purpose utility?
>
> On Sat, Jun 4, 2022 at 3:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > Thanks for the detailed overview, Weston. I agree with David this would be very useful to have in a public doc.
> >
> > Weston and David's discussion is a good one, however, I see it as separate from the discussion I brought up. The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python. In this latter discussion, I'm indifferent to the complexities of data source implementation. I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as well as possible alternatives to these, and more generally in reaching consensus about how a custom data-source written/wrapped in Python would get integrated.
> >
> > > > At the moment as we
> > > are not exposing the execution engine primitives to Python user, are you
> > > expecting to expose them by this approach.
> > >
> > > From our side, these APIs are not directly exposed to the end user, but
> > > rather, primitives that allow us to build on top of.
> >
> > For clarity of discussion, I'd suggest distinguishing between a data-source-integrator and an Acero-user (or end-user), since in many use cases these are not the same person. When I wrote user, I meant a data-source-integrator. An Acero-user would not be directly using the facilities I proposed.
> >
> >
> > Yaron.
> > ________________________________
> > From: David Li <li...@apache.org>
> > Sent: Friday, June 3, 2022 5:53 PM
> > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > Subject: Re: data-source UDFs
> >
> > Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?)
> >
> > Just to chime in quickly here:
> >
> > For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route.
> >
> > -David
> >
> > On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> > > Efficiently reading from a data source is something that has a bit of
> > > complexity (parsing files, connecting to remote data sources, managing
> > > parallel reads, etc.)  Ideally we don't want users to have to reinvent
> > > these things as they go.  The datasets module in Arrow-C++ has a lot
> > > of code here already.
> > >
> > > So I think there is probably more than one extension point.  Some of
> > > these extension points already exist.  I do believe there is
> > > opportunity to create further extension points as well and the
> > > challenge / opportunity here will be figuring out what those are and
> > > what their API should be.
> > >
> > > ## I can describe a little bit about what we have already:
> > >
> > >  * Filesystem abstraction
> > >
> > > Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> > > which is pretty well documented and straightforward.  This is how we
> > > can swap between local disk, S3, etc.  From an Acero / datasets
> > > perspective the API is basically "given a path, give me a stream of
> > > bytes" (open file) and "given a path, give me a list of files" (list
> > > directory).
> > >
> > >  * FileFormat abstraction
> > >
> > > The file format abstraction (arrow::dataset::FileFormat) is how we
> > > swap out different kinds of files.  For example,
> > > arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> > > plenty of details)
> > >
> > >  - Convert input file to schema (inspect)
> > >  - Convert input file to a stream of batches (scan)
> > >  - Convert a stream of batches to an output file (write)
> > >
> > >  * Fragment / Dataset abstraction
> > >
> > > The fragment (arrow::dataset::Fragment) & dataset
> > > (arrow::dataset::Dataset) APIs are how we describe a collection of
> > > files.  This is used by the scanner to implement parallel reads.  You
> > > can think of these as the "source" API.  The APIs are roughly
> > >
> > >  - Convert a dataset to a stream fragments (list dataset)
> > >  - Convert a fragment to a stream of batches (scan)
> > >
> > > The two main implementations of datasets that we have today are
> > > FilesystemDataset (uses a filesystem to list files, each file is a
> > > fragment.  A filesystem fragment uses a format to convert its file to
> > > a stream of batches) and the InMemoryDataset (there is one fragment
> > > and the scan operation is just slicing off pieces of the in-memory
> > > data).  There are also some niche implementations here like a dataset
> > > that is created from a python iterable of batches.  This might be very
> > > similar to what you are describing above.
> > >
> > > A dataset must be created with a "dataset schema" which is the single
> > > schema that all fragments of the dataset can be converted to.
> > >
> > > * Custom source nodes
> > >
> > > All of the above is exposed to Acero via the scan node which is
> > > responsible for turning a dataset into Acero input.  However, the
> > > datasets API could be bypassed entirely to feed Acero in other ways.
> > > Some examples:
> > >
> > >  - The table source node is a way to feed in-memory data (a table)
> > > into Acero.  This is very similar to the InMemoryDataset but bypasses
> > > some of the overhead of the scanner.
> > >  - The TCP-H source node generates random data for benchmarking purposes.
> > >
> > > A lot of things can be expressed both as a simple dataset or a custom
> > > source node.  There is a bit of duplication here and I don't know that
> > > it matters too much.  I'm just pointing this out for pedantic
> > > purposes.
> > >
> > > The API here is just the ExecNode API and so the user needs to provide
> > > something that starts when StartProducing is called and then calls
> > > InputReceived on a regular basis.  From the discussions on the
> > > scheduler I suspect this API may be changing slightly but the idea is
> > > still there.
> > >
> > > ## I'm also aware of a number of things we are still going to need at
> > > some point.
> > >
> > >  * Evolution
> > >
> > > Sometimes different files in a dataset have different schemas.  A very
> > > common case is fields getting added over time or fields getting
> > > renamed or changing data type (e.g. int32 -> int64).  We have some
> > > support for the former but none for the latter.  I've got a pretty
> > > good idea of what the API looks like for "evolution" so if that is
> > > something needed I could write that up.
> > >
> > >  * Flight dataset/fragment/source-node?
> > >
> > > I don't think there will be exactly a "FlightFragment".  Maybe the
> > > right term is ADBC, but I haven't been following that discussion
> > > closely enough.  There needs to be a way to scan a remote data source
> > > that provides its data via a standard flight service.
> > >
> > >  * Sql dataset/fragment/source-node?
> > >
> > > It could be very useful to have a dataset that is capable of reading
> > > data from SQL datasets via something like JDBC (although, if ADBC
> > > connectors get built quickly enough, maybe this is never needed :)
> > >
> > >  * Catalogs
> > >
> > > The filesystem dataset currently figures out the dataset schema
> > > through a rather expensive inspection process and it lists its
> > > fragments using potentially expensive directory listing.  Metadata
> > > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> > > of storing precomputed versions of this information.  A dataset that
> > > is capable of figuring out what files to scan from a catalog would be
> > > valuable.  This dataset might use the same filesystem fragment to do
> > > the actual scan.
> > >
> > >  * Table metadata
> > >
> > > Very similar to the catalogs discussion is the idea of "table
> > > metadata".  This is less about reading data and more about describing
> > > the data.  For example, metadata about any ordering of the incoming
> > > data, unique constraints, not-null constraints, etc.  All of this
> > > information can be used by exec nodes to simplify query processing.
> > > For example, if you are grouping on a set of keys and one of the keys
> > > is ordered then you implement group by with a streaming (not pipeline
> > > breaking) implementation.
> > >
> > >> that allows utilizing existing Python APIs that knows how to read data
> > >> source as a stream of record batches.
> > >
> > > We have a class called arrow::dataset::<unnamed>::OneShotFragment
> > > which knows how to convert a python iterator into a scannable source.
> > > This might serve your needs.  This was also written when things were
> > > more dataset-oriented.  It might also be interesting to create a
> > > python source node which does the same sort of thing, bypassing the
> > > scanner, although I don't know that there would be much concrete
> > > benefit.
> > >
> > > I hope this information is helpful!  It is just background though.  I
> > > think I might need to understand your needs in a bit more detail
> > > before I can offer any kind of prescriptive advice.
> > >
> > > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
> > >>
> > >> Actually, "UDF" might be the wrong terminology here - This is more of a
> > >> "custom Python data source" than "Python user defined functions". (Although
> > >> under the hood it can probably reuse lots of the UDF logic to execute the
> > >> custom data source)
> > >>
> > >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:
> > >>
> > >> > What Yaron is going for is really something similar to custom data source
> > >> > in Spark (
> > >> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
> > >> > that allows utilizing existing Python APIs that knows how to read data
> > >> > source as a stream of record batches.
> > >> >
> > >> >
> > >> >

Re: data-source UDFs

Posted by Yaron Gvili <rt...@hotmail.com>.
> Any C++ extension point could become a pyarrow extension point if desired.

Agreed. Still, the listed extension points are for supporting an implementation, not integration. The existing integration code for a Scalar UDF written in Python (See https://github.com/apache/arrow/pull/12590) is the kind of code I have in mind for supporting integration.

> I have a few questions then.

Before I answer each question, here's a wider view of the design I have in mind. The (Python code of the) data-source UDF is intended to be serialized and embedded in a Substrait plan (I already have this serialization and embedding implemented locally - PR to come). The UDF has/needs no arguments because its code is specific to the plan. The UDF returns a new stream/generator of RecordBatch instances to be used by the source-node in each execution of the plan. The UDF itself is made by a UDF-maker that does get arguments, which are used to derive values that are fixed into the UDF being made.

To your questions in order:

  1.  The data-source UDF is stateless and takes no arguments. The UDF-maker does takes arguments while the RecordBatch stream/generator has state.
  2.  The data-source UDF does not need any information from the execution plan; it does exactly the same thing in each execution - return a new RecordBatch stream/generator for the same data. Only the source-node that holds the data-source UDF interacts with the execution plan.
  3.  The data-source UDF is fixed with (not receives) values derived by the UDF-maker from its arguments. A path can certainly be included in these values. Functions like "Close" may be implemented in the RecordBatch stream/generator.
  4.  Using the function registry is actually not a critical part of the design I'm proposing, which is one reason I asked about it. It just seemed to me as a reasonable place for the execution plan to find the data-source UDF after it gets deserialized from the Substrait plan. If the function registry is not used, then the design would need some other registry or mechanism for passing the deserialized data source-UDF to the execution plan.
  5.  The data-source UDF is specific to an execution plan, so definitely specific to the user who created the Substrait plan in which it is embedded. Users (or perhaps authors) can share data-source-UDF-makers, or libraries thereof, for general purpose.

Yaron.
________________________________
From: Weston Pace <we...@gmail.com>
Sent: Saturday, June 4, 2022 2:41 PM
To: dev@arrow.apache.org <de...@arrow.apache.org>
Subject: Re: data-source UDFs

> The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python

Any C++ extension point could become a pyarrow extension point if
desired.  For example, we already have a pure-python filesystem
adapter if a user wants to implement a filesystem purely in python
(this is how the fsspec adapter works I think).

> I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed

I have a few questions then.

You mentioned that these functions would take no arguments.  Li's
rough example showed a path being provided to the UDF.

  1. If the UDF takes no arguments then I will assume it must be
stateful.  That doesn't really match with the current idea of the
function registry.  Although it may be a problem we need to someday
solve for aggregate UDFs.  What would you imagine the lifecycle is for
this function?  If it is stateful would it not be more accurate to
call it something other than a function?
  2. If the UDF receives no arguments then do you need to communicate
any part of the execution plan (e.g. the Substrait plan) to the UDF?
How would you envision this happening?  Or is this a source that
always does the exact same thing regardless of the execution plan
(this could be valid if, for example, the user configures the source
with a plan independent of the execution plan before they register
it)?
  3. If the UDF is not stateful then it receives some arguments.  What
are they?  Does the UDF receive a path?  Or does the UDF generate
paths?  Are there two different kinds of UDFs, ones that receive paths
and ones that generate paths?

Most of the extension points I described have more than one method.
Even in the APIs where I only listed one method there are some utility
methods like `Close` which I didn't list for simplicity.  While you
can generally represent a class as a "bag of functions" (a stateful
class is a "bag of functions that takes state as the first argument")
I find it to be more difficult for users to follow than just
registering a type with a defined interface.

  4. Is there a particular reason in your use case for using the
function registry for this?
  5. Do you imagine these UDFs would always be specific to particular
users?  Or would it be possible for such a UDF to be shared as a
general purpose utility?

On Sat, Jun 4, 2022 at 3:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> Thanks for the detailed overview, Weston. I agree with David this would be very useful to have in a public doc.
>
> Weston and David's discussion is a good one, however, I see it as separate from the discussion I brought up. The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python. In this latter discussion, I'm indifferent to the complexities of data source implementation. I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as well as possible alternatives to these, and more generally in reaching consensus about how a custom data-source written/wrapped in Python would get integrated.
>
> > > At the moment as we
> > are not exposing the execution engine primitives to Python user, are you
> > expecting to expose them by this approach.
> >
> > From our side, these APIs are not directly exposed to the end user, but
> > rather, primitives that allow us to build on top of.
>
> For clarity of discussion, I'd suggest distinguishing between a data-source-integrator and an Acero-user (or end-user), since in many use cases these are not the same person. When I wrote user, I meant a data-source-integrator. An Acero-user would not be directly using the facilities I proposed.
>
>
> Yaron.
> ________________________________
> From: David Li <li...@apache.org>
> Sent: Friday, June 3, 2022 5:53 PM
> To: dev@arrow.apache.org <de...@arrow.apache.org>
> Subject: Re: data-source UDFs
>
> Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?)
>
> Just to chime in quickly here:
>
> For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route.
>
> -David
>
> On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> > Efficiently reading from a data source is something that has a bit of
> > complexity (parsing files, connecting to remote data sources, managing
> > parallel reads, etc.)  Ideally we don't want users to have to reinvent
> > these things as they go.  The datasets module in Arrow-C++ has a lot
> > of code here already.
> >
> > So I think there is probably more than one extension point.  Some of
> > these extension points already exist.  I do believe there is
> > opportunity to create further extension points as well and the
> > challenge / opportunity here will be figuring out what those are and
> > what their API should be.
> >
> > ## I can describe a little bit about what we have already:
> >
> >  * Filesystem abstraction
> >
> > Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> > which is pretty well documented and straightforward.  This is how we
> > can swap between local disk, S3, etc.  From an Acero / datasets
> > perspective the API is basically "given a path, give me a stream of
> > bytes" (open file) and "given a path, give me a list of files" (list
> > directory).
> >
> >  * FileFormat abstraction
> >
> > The file format abstraction (arrow::dataset::FileFormat) is how we
> > swap out different kinds of files.  For example,
> > arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> > plenty of details)
> >
> >  - Convert input file to schema (inspect)
> >  - Convert input file to a stream of batches (scan)
> >  - Convert a stream of batches to an output file (write)
> >
> >  * Fragment / Dataset abstraction
> >
> > The fragment (arrow::dataset::Fragment) & dataset
> > (arrow::dataset::Dataset) APIs are how we describe a collection of
> > files.  This is used by the scanner to implement parallel reads.  You
> > can think of these as the "source" API.  The APIs are roughly
> >
> >  - Convert a dataset to a stream fragments (list dataset)
> >  - Convert a fragment to a stream of batches (scan)
> >
> > The two main implementations of datasets that we have today are
> > FilesystemDataset (uses a filesystem to list files, each file is a
> > fragment.  A filesystem fragment uses a format to convert its file to
> > a stream of batches) and the InMemoryDataset (there is one fragment
> > and the scan operation is just slicing off pieces of the in-memory
> > data).  There are also some niche implementations here like a dataset
> > that is created from a python iterable of batches.  This might be very
> > similar to what you are describing above.
> >
> > A dataset must be created with a "dataset schema" which is the single
> > schema that all fragments of the dataset can be converted to.
> >
> > * Custom source nodes
> >
> > All of the above is exposed to Acero via the scan node which is
> > responsible for turning a dataset into Acero input.  However, the
> > datasets API could be bypassed entirely to feed Acero in other ways.
> > Some examples:
> >
> >  - The table source node is a way to feed in-memory data (a table)
> > into Acero.  This is very similar to the InMemoryDataset but bypasses
> > some of the overhead of the scanner.
> >  - The TCP-H source node generates random data for benchmarking purposes.
> >
> > A lot of things can be expressed both as a simple dataset or a custom
> > source node.  There is a bit of duplication here and I don't know that
> > it matters too much.  I'm just pointing this out for pedantic
> > purposes.
> >
> > The API here is just the ExecNode API and so the user needs to provide
> > something that starts when StartProducing is called and then calls
> > InputReceived on a regular basis.  From the discussions on the
> > scheduler I suspect this API may be changing slightly but the idea is
> > still there.
> >
> > ## I'm also aware of a number of things we are still going to need at
> > some point.
> >
> >  * Evolution
> >
> > Sometimes different files in a dataset have different schemas.  A very
> > common case is fields getting added over time or fields getting
> > renamed or changing data type (e.g. int32 -> int64).  We have some
> > support for the former but none for the latter.  I've got a pretty
> > good idea of what the API looks like for "evolution" so if that is
> > something needed I could write that up.
> >
> >  * Flight dataset/fragment/source-node?
> >
> > I don't think there will be exactly a "FlightFragment".  Maybe the
> > right term is ADBC, but I haven't been following that discussion
> > closely enough.  There needs to be a way to scan a remote data source
> > that provides its data via a standard flight service.
> >
> >  * Sql dataset/fragment/source-node?
> >
> > It could be very useful to have a dataset that is capable of reading
> > data from SQL datasets via something like JDBC (although, if ADBC
> > connectors get built quickly enough, maybe this is never needed :)
> >
> >  * Catalogs
> >
> > The filesystem dataset currently figures out the dataset schema
> > through a rather expensive inspection process and it lists its
> > fragments using potentially expensive directory listing.  Metadata
> > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> > of storing precomputed versions of this information.  A dataset that
> > is capable of figuring out what files to scan from a catalog would be
> > valuable.  This dataset might use the same filesystem fragment to do
> > the actual scan.
> >
> >  * Table metadata
> >
> > Very similar to the catalogs discussion is the idea of "table
> > metadata".  This is less about reading data and more about describing
> > the data.  For example, metadata about any ordering of the incoming
> > data, unique constraints, not-null constraints, etc.  All of this
> > information can be used by exec nodes to simplify query processing.
> > For example, if you are grouping on a set of keys and one of the keys
> > is ordered then you implement group by with a streaming (not pipeline
> > breaking) implementation.
> >
> >> that allows utilizing existing Python APIs that knows how to read data
> >> source as a stream of record batches.
> >
> > We have a class called arrow::dataset::<unnamed>::OneShotFragment
> > which knows how to convert a python iterator into a scannable source.
> > This might serve your needs.  This was also written when things were
> > more dataset-oriented.  It might also be interesting to create a
> > python source node which does the same sort of thing, bypassing the
> > scanner, although I don't know that there would be much concrete
> > benefit.
> >
> > I hope this information is helpful!  It is just background though.  I
> > think I might need to understand your needs in a bit more detail
> > before I can offer any kind of prescriptive advice.
> >
> > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
> >>
> >> Actually, "UDF" might be the wrong terminology here - This is more of a
> >> "custom Python data source" than "Python user defined functions". (Although
> >> under the hood it can probably reuse lots of the UDF logic to execute the
> >> custom data source)
> >>
> >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:
> >>
> >> > What Yaron is going for is really something similar to custom data source
> >> > in Spark (
> >> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
> >> > that allows utilizing existing Python APIs that knows how to read data
> >> > source as a stream of record batches.
> >> >
> >> >
> >> >

Re: data-source UDFs

Posted by Weston Pace <we...@gmail.com>.
> The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python

Any C++ extension point could become a pyarrow extension point if
desired.  For example, we already have a pure-python filesystem
adapter if a user wants to implement a filesystem purely in python
(this is how the fsspec adapter works I think).

> I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed

I have a few questions then.

You mentioned that these functions would take no arguments.  Li's
rough example showed a path being provided to the UDF.

  1. If the UDF takes no arguments then I will assume it must be
stateful.  That doesn't really match with the current idea of the
function registry.  Although it may be a problem we need to someday
solve for aggregate UDFs.  What would you imagine the lifecycle is for
this function?  If it is stateful would it not be more accurate to
call it something other than a function?
  2. If the UDF receives no arguments then do you need to communicate
any part of the execution plan (e.g. the Substrait plan) to the UDF?
How would you envision this happening?  Or is this a source that
always does the exact same thing regardless of the execution plan
(this could be valid if, for example, the user configures the source
with a plan independent of the execution plan before they register
it)?
  3. If the UDF is not stateful then it receives some arguments.  What
are they?  Does the UDF receive a path?  Or does the UDF generate
paths?  Are there two different kinds of UDFs, ones that receive paths
and ones that generate paths?

Most of the extension points I described have more than one method.
Even in the APIs where I only listed one method there are some utility
methods like `Close` which I didn't list for simplicity.  While you
can generally represent a class as a "bag of functions" (a stateful
class is a "bag of functions that takes state as the first argument")
I find it to be more difficult for users to follow than just
registering a type with a defined interface.

  4. Is there a particular reason in your use case for using the
function registry for this?
  5. Do you imagine these UDFs would always be specific to particular
users?  Or would it be possible for such a UDF to be shared as a
general purpose utility?

On Sat, Jun 4, 2022 at 3:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> Thanks for the detailed overview, Weston. I agree with David this would be very useful to have in a public doc.
>
> Weston and David's discussion is a good one, however, I see it as separate from the discussion I brought up. The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python. In this latter discussion, I'm indifferent to the complexities of data source implementation. I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as well as possible alternatives to these, and more generally in reaching consensus about how a custom data-source written/wrapped in Python would get integrated.
>
> > > At the moment as we
> > are not exposing the execution engine primitives to Python user, are you
> > expecting to expose them by this approach.
> >
> > From our side, these APIs are not directly exposed to the end user, but
> > rather, primitives that allow us to build on top of.
>
> For clarity of discussion, I'd suggest distinguishing between a data-source-integrator and an Acero-user (or end-user), since in many use cases these are not the same person. When I wrote user, I meant a data-source-integrator. An Acero-user would not be directly using the facilities I proposed.
>
>
> Yaron.
> ________________________________
> From: David Li <li...@apache.org>
> Sent: Friday, June 3, 2022 5:53 PM
> To: dev@arrow.apache.org <de...@arrow.apache.org>
> Subject: Re: data-source UDFs
>
> Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?)
>
> Just to chime in quickly here:
>
> For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route.
>
> -David
>
> On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> > Efficiently reading from a data source is something that has a bit of
> > complexity (parsing files, connecting to remote data sources, managing
> > parallel reads, etc.)  Ideally we don't want users to have to reinvent
> > these things as they go.  The datasets module in Arrow-C++ has a lot
> > of code here already.
> >
> > So I think there is probably more than one extension point.  Some of
> > these extension points already exist.  I do believe there is
> > opportunity to create further extension points as well and the
> > challenge / opportunity here will be figuring out what those are and
> > what their API should be.
> >
> > ## I can describe a little bit about what we have already:
> >
> >  * Filesystem abstraction
> >
> > Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> > which is pretty well documented and straightforward.  This is how we
> > can swap between local disk, S3, etc.  From an Acero / datasets
> > perspective the API is basically "given a path, give me a stream of
> > bytes" (open file) and "given a path, give me a list of files" (list
> > directory).
> >
> >  * FileFormat abstraction
> >
> > The file format abstraction (arrow::dataset::FileFormat) is how we
> > swap out different kinds of files.  For example,
> > arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> > plenty of details)
> >
> >  - Convert input file to schema (inspect)
> >  - Convert input file to a stream of batches (scan)
> >  - Convert a stream of batches to an output file (write)
> >
> >  * Fragment / Dataset abstraction
> >
> > The fragment (arrow::dataset::Fragment) & dataset
> > (arrow::dataset::Dataset) APIs are how we describe a collection of
> > files.  This is used by the scanner to implement parallel reads.  You
> > can think of these as the "source" API.  The APIs are roughly
> >
> >  - Convert a dataset to a stream fragments (list dataset)
> >  - Convert a fragment to a stream of batches (scan)
> >
> > The two main implementations of datasets that we have today are
> > FilesystemDataset (uses a filesystem to list files, each file is a
> > fragment.  A filesystem fragment uses a format to convert its file to
> > a stream of batches) and the InMemoryDataset (there is one fragment
> > and the scan operation is just slicing off pieces of the in-memory
> > data).  There are also some niche implementations here like a dataset
> > that is created from a python iterable of batches.  This might be very
> > similar to what you are describing above.
> >
> > A dataset must be created with a "dataset schema" which is the single
> > schema that all fragments of the dataset can be converted to.
> >
> > * Custom source nodes
> >
> > All of the above is exposed to Acero via the scan node which is
> > responsible for turning a dataset into Acero input.  However, the
> > datasets API could be bypassed entirely to feed Acero in other ways.
> > Some examples:
> >
> >  - The table source node is a way to feed in-memory data (a table)
> > into Acero.  This is very similar to the InMemoryDataset but bypasses
> > some of the overhead of the scanner.
> >  - The TCP-H source node generates random data for benchmarking purposes.
> >
> > A lot of things can be expressed both as a simple dataset or a custom
> > source node.  There is a bit of duplication here and I don't know that
> > it matters too much.  I'm just pointing this out for pedantic
> > purposes.
> >
> > The API here is just the ExecNode API and so the user needs to provide
> > something that starts when StartProducing is called and then calls
> > InputReceived on a regular basis.  From the discussions on the
> > scheduler I suspect this API may be changing slightly but the idea is
> > still there.
> >
> > ## I'm also aware of a number of things we are still going to need at
> > some point.
> >
> >  * Evolution
> >
> > Sometimes different files in a dataset have different schemas.  A very
> > common case is fields getting added over time or fields getting
> > renamed or changing data type (e.g. int32 -> int64).  We have some
> > support for the former but none for the latter.  I've got a pretty
> > good idea of what the API looks like for "evolution" so if that is
> > something needed I could write that up.
> >
> >  * Flight dataset/fragment/source-node?
> >
> > I don't think there will be exactly a "FlightFragment".  Maybe the
> > right term is ADBC, but I haven't been following that discussion
> > closely enough.  There needs to be a way to scan a remote data source
> > that provides its data via a standard flight service.
> >
> >  * Sql dataset/fragment/source-node?
> >
> > It could be very useful to have a dataset that is capable of reading
> > data from SQL datasets via something like JDBC (although, if ADBC
> > connectors get built quickly enough, maybe this is never needed :)
> >
> >  * Catalogs
> >
> > The filesystem dataset currently figures out the dataset schema
> > through a rather expensive inspection process and it lists its
> > fragments using potentially expensive directory listing.  Metadata
> > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> > of storing precomputed versions of this information.  A dataset that
> > is capable of figuring out what files to scan from a catalog would be
> > valuable.  This dataset might use the same filesystem fragment to do
> > the actual scan.
> >
> >  * Table metadata
> >
> > Very similar to the catalogs discussion is the idea of "table
> > metadata".  This is less about reading data and more about describing
> > the data.  For example, metadata about any ordering of the incoming
> > data, unique constraints, not-null constraints, etc.  All of this
> > information can be used by exec nodes to simplify query processing.
> > For example, if you are grouping on a set of keys and one of the keys
> > is ordered then you implement group by with a streaming (not pipeline
> > breaking) implementation.
> >
> >> that allows utilizing existing Python APIs that knows how to read data
> >> source as a stream of record batches.
> >
> > We have a class called arrow::dataset::<unnamed>::OneShotFragment
> > which knows how to convert a python iterator into a scannable source.
> > This might serve your needs.  This was also written when things were
> > more dataset-oriented.  It might also be interesting to create a
> > python source node which does the same sort of thing, bypassing the
> > scanner, although I don't know that there would be much concrete
> > benefit.
> >
> > I hope this information is helpful!  It is just background though.  I
> > think I might need to understand your needs in a bit more detail
> > before I can offer any kind of prescriptive advice.
> >
> > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
> >>
> >> Actually, "UDF" might be the wrong terminology here - This is more of a
> >> "custom Python data source" than "Python user defined functions". (Although
> >> under the hood it can probably reuse lots of the UDF logic to execute the
> >> custom data source)
> >>
> >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:
> >>
> >> > What Yaron is going for is really something similar to custom data source
> >> > in Spark (
> >> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
> >> > that allows utilizing existing Python APIs that knows how to read data
> >> > source as a stream of record batches.
> >> >
> >> >
> >> >

Re: data-source UDFs

Posted by Yaron Gvili <rt...@hotmail.com>.
Thanks for the detailed overview, Weston. I agree with David this would be very useful to have in a public doc.

Weston and David's discussion is a good one, however, I see it as separate from the discussion I brought up. The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python. In this latter discussion, I'm indifferent to the complexities of data source implementation. I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as well as possible alternatives to these, and more generally in reaching consensus about how a custom data-source written/wrapped in Python would get integrated.

> > At the moment as we
> are not exposing the execution engine primitives to Python user, are you
> expecting to expose them by this approach.
>
> From our side, these APIs are not directly exposed to the end user, but
> rather, primitives that allow us to build on top of.

For clarity of discussion, I'd suggest distinguishing between a data-source-integrator and an Acero-user (or end-user), since in many use cases these are not the same person. When I wrote user, I meant a data-source-integrator. An Acero-user would not be directly using the facilities I proposed.


Yaron.
________________________________
From: David Li <li...@apache.org>
Sent: Friday, June 3, 2022 5:53 PM
To: dev@arrow.apache.org <de...@arrow.apache.org>
Subject: Re: data-source UDFs

Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?)

Just to chime in quickly here:

For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route.

-David

On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> Efficiently reading from a data source is something that has a bit of
> complexity (parsing files, connecting to remote data sources, managing
> parallel reads, etc.)  Ideally we don't want users to have to reinvent
> these things as they go.  The datasets module in Arrow-C++ has a lot
> of code here already.
>
> So I think there is probably more than one extension point.  Some of
> these extension points already exist.  I do believe there is
> opportunity to create further extension points as well and the
> challenge / opportunity here will be figuring out what those are and
> what their API should be.
>
> ## I can describe a little bit about what we have already:
>
>  * Filesystem abstraction
>
> Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> which is pretty well documented and straightforward.  This is how we
> can swap between local disk, S3, etc.  From an Acero / datasets
> perspective the API is basically "given a path, give me a stream of
> bytes" (open file) and "given a path, give me a list of files" (list
> directory).
>
>  * FileFormat abstraction
>
> The file format abstraction (arrow::dataset::FileFormat) is how we
> swap out different kinds of files.  For example,
> arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> plenty of details)
>
>  - Convert input file to schema (inspect)
>  - Convert input file to a stream of batches (scan)
>  - Convert a stream of batches to an output file (write)
>
>  * Fragment / Dataset abstraction
>
> The fragment (arrow::dataset::Fragment) & dataset
> (arrow::dataset::Dataset) APIs are how we describe a collection of
> files.  This is used by the scanner to implement parallel reads.  You
> can think of these as the "source" API.  The APIs are roughly
>
>  - Convert a dataset to a stream fragments (list dataset)
>  - Convert a fragment to a stream of batches (scan)
>
> The two main implementations of datasets that we have today are
> FilesystemDataset (uses a filesystem to list files, each file is a
> fragment.  A filesystem fragment uses a format to convert its file to
> a stream of batches) and the InMemoryDataset (there is one fragment
> and the scan operation is just slicing off pieces of the in-memory
> data).  There are also some niche implementations here like a dataset
> that is created from a python iterable of batches.  This might be very
> similar to what you are describing above.
>
> A dataset must be created with a "dataset schema" which is the single
> schema that all fragments of the dataset can be converted to.
>
> * Custom source nodes
>
> All of the above is exposed to Acero via the scan node which is
> responsible for turning a dataset into Acero input.  However, the
> datasets API could be bypassed entirely to feed Acero in other ways.
> Some examples:
>
>  - The table source node is a way to feed in-memory data (a table)
> into Acero.  This is very similar to the InMemoryDataset but bypasses
> some of the overhead of the scanner.
>  - The TCP-H source node generates random data for benchmarking purposes.
>
> A lot of things can be expressed both as a simple dataset or a custom
> source node.  There is a bit of duplication here and I don't know that
> it matters too much.  I'm just pointing this out for pedantic
> purposes.
>
> The API here is just the ExecNode API and so the user needs to provide
> something that starts when StartProducing is called and then calls
> InputReceived on a regular basis.  From the discussions on the
> scheduler I suspect this API may be changing slightly but the idea is
> still there.
>
> ## I'm also aware of a number of things we are still going to need at
> some point.
>
>  * Evolution
>
> Sometimes different files in a dataset have different schemas.  A very
> common case is fields getting added over time or fields getting
> renamed or changing data type (e.g. int32 -> int64).  We have some
> support for the former but none for the latter.  I've got a pretty
> good idea of what the API looks like for "evolution" so if that is
> something needed I could write that up.
>
>  * Flight dataset/fragment/source-node?
>
> I don't think there will be exactly a "FlightFragment".  Maybe the
> right term is ADBC, but I haven't been following that discussion
> closely enough.  There needs to be a way to scan a remote data source
> that provides its data via a standard flight service.
>
>  * Sql dataset/fragment/source-node?
>
> It could be very useful to have a dataset that is capable of reading
> data from SQL datasets via something like JDBC (although, if ADBC
> connectors get built quickly enough, maybe this is never needed :)
>
>  * Catalogs
>
> The filesystem dataset currently figures out the dataset schema
> through a rather expensive inspection process and it lists its
> fragments using potentially expensive directory listing.  Metadata
> catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> of storing precomputed versions of this information.  A dataset that
> is capable of figuring out what files to scan from a catalog would be
> valuable.  This dataset might use the same filesystem fragment to do
> the actual scan.
>
>  * Table metadata
>
> Very similar to the catalogs discussion is the idea of "table
> metadata".  This is less about reading data and more about describing
> the data.  For example, metadata about any ordering of the incoming
> data, unique constraints, not-null constraints, etc.  All of this
> information can be used by exec nodes to simplify query processing.
> For example, if you are grouping on a set of keys and one of the keys
> is ordered then you implement group by with a streaming (not pipeline
> breaking) implementation.
>
>> that allows utilizing existing Python APIs that knows how to read data
>> source as a stream of record batches.
>
> We have a class called arrow::dataset::<unnamed>::OneShotFragment
> which knows how to convert a python iterator into a scannable source.
> This might serve your needs.  This was also written when things were
> more dataset-oriented.  It might also be interesting to create a
> python source node which does the same sort of thing, bypassing the
> scanner, although I don't know that there would be much concrete
> benefit.
>
> I hope this information is helpful!  It is just background though.  I
> think I might need to understand your needs in a bit more detail
> before I can offer any kind of prescriptive advice.
>
> On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
>>
>> Actually, "UDF" might be the wrong terminology here - This is more of a
>> "custom Python data source" than "Python user defined functions". (Although
>> under the hood it can probably reuse lots of the UDF logic to execute the
>> custom data source)
>>
>> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:
>>
>> > What Yaron is going for is really something similar to custom data source
>> > in Spark (
>> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
>> > that allows utilizing existing Python APIs that knows how to read data
>> > source as a stream of record batches.
>> >
>> >
>> >

Re: data-source UDFs

Posted by David Li <li...@apache.org>.
Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?)

Just to chime in quickly here:

For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route.

-David

On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> Efficiently reading from a data source is something that has a bit of
> complexity (parsing files, connecting to remote data sources, managing
> parallel reads, etc.)  Ideally we don't want users to have to reinvent
> these things as they go.  The datasets module in Arrow-C++ has a lot
> of code here already.
>
> So I think there is probably more than one extension point.  Some of
> these extension points already exist.  I do believe there is
> opportunity to create further extension points as well and the
> challenge / opportunity here will be figuring out what those are and
> what their API should be.
>
> ## I can describe a little bit about what we have already:
>
>  * Filesystem abstraction
>
> Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> which is pretty well documented and straightforward.  This is how we
> can swap between local disk, S3, etc.  From an Acero / datasets
> perspective the API is basically "given a path, give me a stream of
> bytes" (open file) and "given a path, give me a list of files" (list
> directory).
>
>  * FileFormat abstraction
>
> The file format abstraction (arrow::dataset::FileFormat) is how we
> swap out different kinds of files.  For example,
> arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> plenty of details)
>
>  - Convert input file to schema (inspect)
>  - Convert input file to a stream of batches (scan)
>  - Convert a stream of batches to an output file (write)
>
>  * Fragment / Dataset abstraction
>
> The fragment (arrow::dataset::Fragment) & dataset
> (arrow::dataset::Dataset) APIs are how we describe a collection of
> files.  This is used by the scanner to implement parallel reads.  You
> can think of these as the "source" API.  The APIs are roughly
>
>  - Convert a dataset to a stream fragments (list dataset)
>  - Convert a fragment to a stream of batches (scan)
>
> The two main implementations of datasets that we have today are
> FilesystemDataset (uses a filesystem to list files, each file is a
> fragment.  A filesystem fragment uses a format to convert its file to
> a stream of batches) and the InMemoryDataset (there is one fragment
> and the scan operation is just slicing off pieces of the in-memory
> data).  There are also some niche implementations here like a dataset
> that is created from a python iterable of batches.  This might be very
> similar to what you are describing above.
>
> A dataset must be created with a "dataset schema" which is the single
> schema that all fragments of the dataset can be converted to.
>
> * Custom source nodes
>
> All of the above is exposed to Acero via the scan node which is
> responsible for turning a dataset into Acero input.  However, the
> datasets API could be bypassed entirely to feed Acero in other ways.
> Some examples:
>
>  - The table source node is a way to feed in-memory data (a table)
> into Acero.  This is very similar to the InMemoryDataset but bypasses
> some of the overhead of the scanner.
>  - The TCP-H source node generates random data for benchmarking purposes.
>
> A lot of things can be expressed both as a simple dataset or a custom
> source node.  There is a bit of duplication here and I don't know that
> it matters too much.  I'm just pointing this out for pedantic
> purposes.
>
> The API here is just the ExecNode API and so the user needs to provide
> something that starts when StartProducing is called and then calls
> InputReceived on a regular basis.  From the discussions on the
> scheduler I suspect this API may be changing slightly but the idea is
> still there.
>
> ## I'm also aware of a number of things we are still going to need at
> some point.
>
>  * Evolution
>
> Sometimes different files in a dataset have different schemas.  A very
> common case is fields getting added over time or fields getting
> renamed or changing data type (e.g. int32 -> int64).  We have some
> support for the former but none for the latter.  I've got a pretty
> good idea of what the API looks like for "evolution" so if that is
> something needed I could write that up.
>
>  * Flight dataset/fragment/source-node?
>
> I don't think there will be exactly a "FlightFragment".  Maybe the
> right term is ADBC, but I haven't been following that discussion
> closely enough.  There needs to be a way to scan a remote data source
> that provides its data via a standard flight service.
>
>  * Sql dataset/fragment/source-node?
>
> It could be very useful to have a dataset that is capable of reading
> data from SQL datasets via something like JDBC (although, if ADBC
> connectors get built quickly enough, maybe this is never needed :)
>
>  * Catalogs
>
> The filesystem dataset currently figures out the dataset schema
> through a rather expensive inspection process and it lists its
> fragments using potentially expensive directory listing.  Metadata
> catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> of storing precomputed versions of this information.  A dataset that
> is capable of figuring out what files to scan from a catalog would be
> valuable.  This dataset might use the same filesystem fragment to do
> the actual scan.
>
>  * Table metadata
>
> Very similar to the catalogs discussion is the idea of "table
> metadata".  This is less about reading data and more about describing
> the data.  For example, metadata about any ordering of the incoming
> data, unique constraints, not-null constraints, etc.  All of this
> information can be used by exec nodes to simplify query processing.
> For example, if you are grouping on a set of keys and one of the keys
> is ordered then you implement group by with a streaming (not pipeline
> breaking) implementation.
>
>> that allows utilizing existing Python APIs that knows how to read data
>> source as a stream of record batches.
>
> We have a class called arrow::dataset::<unnamed>::OneShotFragment
> which knows how to convert a python iterator into a scannable source.
> This might serve your needs.  This was also written when things were
> more dataset-oriented.  It might also be interesting to create a
> python source node which does the same sort of thing, bypassing the
> scanner, although I don't know that there would be much concrete
> benefit.
>
> I hope this information is helpful!  It is just background though.  I
> think I might need to understand your needs in a bit more detail
> before I can offer any kind of prescriptive advice.
>
> On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
>>
>> Actually, "UDF" might be the wrong terminology here - This is more of a
>> "custom Python data source" than "Python user defined functions". (Although
>> under the hood it can probably reuse lots of the UDF logic to execute the
>> custom data source)
>>
>> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:
>>
>> > What Yaron is going for is really something similar to custom data source
>> > in Spark (
>> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
>> > that allows utilizing existing Python APIs that knows how to read data
>> > source as a stream of record batches.
>> >
>> >
>> >

Re: data-source UDFs

Posted by Weston Pace <we...@gmail.com>.
Efficiently reading from a data source is something that has a bit of
complexity (parsing files, connecting to remote data sources, managing
parallel reads, etc.)  Ideally we don't want users to have to reinvent
these things as they go.  The datasets module in Arrow-C++ has a lot
of code here already.

So I think there is probably more than one extension point.  Some of
these extension points already exist.  I do believe there is
opportunity to create further extension points as well and the
challenge / opportunity here will be figuring out what those are and
what their API should be.

## I can describe a little bit about what we have already:

 * Filesystem abstraction

Right now we have a filesystem abstraction (arrow::fs::FileSystem)
which is pretty well documented and straightforward.  This is how we
can swap between local disk, S3, etc.  From an Acero / datasets
perspective the API is basically "given a path, give me a stream of
bytes" (open file) and "given a path, give me a list of files" (list
directory).

 * FileFormat abstraction

The file format abstraction (arrow::dataset::FileFormat) is how we
swap out different kinds of files.  For example,
arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
plenty of details)

 - Convert input file to schema (inspect)
 - Convert input file to a stream of batches (scan)
 - Convert a stream of batches to an output file (write)

 * Fragment / Dataset abstraction

The fragment (arrow::dataset::Fragment) & dataset
(arrow::dataset::Dataset) APIs are how we describe a collection of
files.  This is used by the scanner to implement parallel reads.  You
can think of these as the "source" API.  The APIs are roughly

 - Convert a dataset to a stream fragments (list dataset)
 - Convert a fragment to a stream of batches (scan)

The two main implementations of datasets that we have today are
FilesystemDataset (uses a filesystem to list files, each file is a
fragment.  A filesystem fragment uses a format to convert its file to
a stream of batches) and the InMemoryDataset (there is one fragment
and the scan operation is just slicing off pieces of the in-memory
data).  There are also some niche implementations here like a dataset
that is created from a python iterable of batches.  This might be very
similar to what you are describing above.

A dataset must be created with a "dataset schema" which is the single
schema that all fragments of the dataset can be converted to.

* Custom source nodes

All of the above is exposed to Acero via the scan node which is
responsible for turning a dataset into Acero input.  However, the
datasets API could be bypassed entirely to feed Acero in other ways.
Some examples:

 - The table source node is a way to feed in-memory data (a table)
into Acero.  This is very similar to the InMemoryDataset but bypasses
some of the overhead of the scanner.
 - The TCP-H source node generates random data for benchmarking purposes.

A lot of things can be expressed both as a simple dataset or a custom
source node.  There is a bit of duplication here and I don't know that
it matters too much.  I'm just pointing this out for pedantic
purposes.

The API here is just the ExecNode API and so the user needs to provide
something that starts when StartProducing is called and then calls
InputReceived on a regular basis.  From the discussions on the
scheduler I suspect this API may be changing slightly but the idea is
still there.

## I'm also aware of a number of things we are still going to need at
some point.

 * Evolution

Sometimes different files in a dataset have different schemas.  A very
common case is fields getting added over time or fields getting
renamed or changing data type (e.g. int32 -> int64).  We have some
support for the former but none for the latter.  I've got a pretty
good idea of what the API looks like for "evolution" so if that is
something needed I could write that up.

 * Flight dataset/fragment/source-node?

I don't think there will be exactly a "FlightFragment".  Maybe the
right term is ADBC, but I haven't been following that discussion
closely enough.  There needs to be a way to scan a remote data source
that provides its data via a standard flight service.

 * Sql dataset/fragment/source-node?

It could be very useful to have a dataset that is capable of reading
data from SQL datasets via something like JDBC (although, if ADBC
connectors get built quickly enough, maybe this is never needed :)

 * Catalogs

The filesystem dataset currently figures out the dataset schema
through a rather expensive inspection process and it lists its
fragments using potentially expensive directory listing.  Metadata
catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
of storing precomputed versions of this information.  A dataset that
is capable of figuring out what files to scan from a catalog would be
valuable.  This dataset might use the same filesystem fragment to do
the actual scan.

 * Table metadata

Very similar to the catalogs discussion is the idea of "table
metadata".  This is less about reading data and more about describing
the data.  For example, metadata about any ordering of the incoming
data, unique constraints, not-null constraints, etc.  All of this
information can be used by exec nodes to simplify query processing.
For example, if you are grouping on a set of keys and one of the keys
is ordered then you implement group by with a streaming (not pipeline
breaking) implementation.

> that allows utilizing existing Python APIs that knows how to read data
> source as a stream of record batches.

We have a class called arrow::dataset::<unnamed>::OneShotFragment
which knows how to convert a python iterator into a scannable source.
This might serve your needs.  This was also written when things were
more dataset-oriented.  It might also be interesting to create a
python source node which does the same sort of thing, bypassing the
scanner, although I don't know that there would be much concrete
benefit.

I hope this information is helpful!  It is just background though.  I
think I might need to understand your needs in a bit more detail
before I can offer any kind of prescriptive advice.

On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ic...@gmail.com> wrote:
>
> Actually, "UDF" might be the wrong terminology here - This is more of a
> "custom Python data source" than "Python user defined functions". (Although
> under the hood it can probably reuse lots of the UDF logic to execute the
> custom data source)
>
> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:
>
> > What Yaron is going for is really something similar to custom data source
> > in Spark (
> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
> > that allows utilizing existing Python APIs that knows how to read data
> > source as a stream of record batches.
> >
> >
> >

Re: data-source UDFs

Posted by Li Jin <ic...@gmail.com>.
Actually, "UDF" might be the wrong terminology here - This is more of a
"custom Python data source" than "Python user defined functions". (Although
under the hood it can probably reuse lots of the UDF logic to execute the
custom data source)

On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ic...@gmail.com> wrote:

> What Yaron is going for is really something similar to custom data source
> in Spark (
> https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
> that allows utilizing existing Python APIs that knows how to read data
> source as a stream of record batches.
>
>
>

Re: data-source UDFs

Posted by Li Jin <ic...@gmail.com>.
What Yaron is going for is really something similar to custom data source
in Spark (
https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
that allows utilizing existing Python APIs that knows how to read data
source as a stream of record batches.

Re: data-source UDFs

Posted by Li Jin <ic...@gmail.com>.
> At the moment as we
are not exposing the execution engine primitives to Python user, are you
expecting to expose them by this approach.

From our side, these APIs are not directly exposed to the end user, but
rather, primitives that allow us to build on top of.

The end user would just do sth like: (not actual API, but give to give some
idea)

from data import some_storage

table = data.some_storage(path)
result = run_query(table)

But we will use the data-source UDFs primitives to implement "some_storage"

On Fri, Jun 3, 2022 at 1:53 PM Vibhatha Abeykoon <vi...@gmail.com> wrote:

> First of all, this is a nice discussion, but I have a doubt.
>
> I have a question regarding the simplicity of things. At the moment as we
> are not exposing the execution engine primitives to Python user, are you
> expecting to expose them by this approach?
>
> On Fri, Jun 3, 2022 at 9:02 PM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > Hi,
> >
> > I'm working on support for data-source UDFs and would like to get
> feedback
> > about the design I have in mind for it.
> >
> > By support for data-source UDFs, at a basic level, I mean enabling a user
> > to define using PyArrow APIs a record-batch-generating function
> implemented
> > in Python that would be easily plugged into a source-node in a
> > streaming-engine execution plan. Such functions are similar to the
> existing
> > scalar UDFs with zero inputs, but an important difference is that scalar
> > UDFs are plugged and composed in expressions whereas data-source UDFs
> would
> > be plugged into a source-node.
> >
> > Focusing on the Arrow and PyArrow parts (I'm leaving the Ibis and
> > Ibis-Substrait parts out), the design I have in mind includes:
> >
> >   *   In Arrow: Adding a new source-UDF kind of arrow::compute::Function,
> > for functions that generate data. Such functions would be registered in a
> > FunctionRegistry but not used in scalar expressions nor composed.
> >   *   In Arrow: Adding SourceUdfContext and SourceUdfOptions (similar to
> > ScalarUdfContext and ScalarUdfOptions) in "cpp/src/arrow/python/udf.h".
> >   *   In Arrow: Adding a UdfSourceExecNode into which a (source-UDF-kind
> > of) function can be plugged.
> >   *   In PyArrow: Following the design of scalar UDFs, and hopefully
> > reusing much of it.
> >
> > Cheers,
> > Yaron.
> >
> --
> Vibhatha Abeykoon
>

Re: data-source UDFs

Posted by Vibhatha Abeykoon <vi...@gmail.com>.
First of all, this is a nice discussion, but I have a doubt.

I have a question regarding the simplicity of things. At the moment as we
are not exposing the execution engine primitives to Python user, are you
expecting to expose them by this approach?

On Fri, Jun 3, 2022 at 9:02 PM Yaron Gvili <rt...@hotmail.com> wrote:

> Hi,
>
> I'm working on support for data-source UDFs and would like to get feedback
> about the design I have in mind for it.
>
> By support for data-source UDFs, at a basic level, I mean enabling a user
> to define using PyArrow APIs a record-batch-generating function implemented
> in Python that would be easily plugged into a source-node in a
> streaming-engine execution plan. Such functions are similar to the existing
> scalar UDFs with zero inputs, but an important difference is that scalar
> UDFs are plugged and composed in expressions whereas data-source UDFs would
> be plugged into a source-node.
>
> Focusing on the Arrow and PyArrow parts (I'm leaving the Ibis and
> Ibis-Substrait parts out), the design I have in mind includes:
>
>   *   In Arrow: Adding a new source-UDF kind of arrow::compute::Function,
> for functions that generate data. Such functions would be registered in a
> FunctionRegistry but not used in scalar expressions nor composed.
>   *   In Arrow: Adding SourceUdfContext and SourceUdfOptions (similar to
> ScalarUdfContext and ScalarUdfOptions) in "cpp/src/arrow/python/udf.h".
>   *   In Arrow: Adding a UdfSourceExecNode into which a (source-UDF-kind
> of) function can be plugged.
>   *   In PyArrow: Following the design of scalar UDFs, and hopefully
> reusing much of it.
>
> Cheers,
> Yaron.
>
-- 
Vibhatha Abeykoon