You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Micah Kornfield <em...@gmail.com> on 2022/11/02 02:09:07 UTC

Re: pyarrow dataset API

Moving conversation to dev@ which is more appropriate place to discuss.

On Tuesday, November 1, 2022, Chang She <ch...@eto.ai> wrote:

> Hi there,
>
> The pyarrow dataset API is marked experimental so I'm curious if y'all
> have made any decisions on it for upcoming releases. Specifically, any
> thoughts on making the Scanner and things like FileSystemDataset part of
> the "public API" (i.e., putting declarations in the _dataset.pxd)? It would
> make it a lot easier for new data formats to be built on top of the Arrow
> platform. e.g., Lance supports efficient partial reads from s3 for
> limit/offset (via additional ScanOptions), but currently it's difficult to
> expose the scanner to the rest of Arrow. Instead we subclass Dataset and
> return a custom scanner we created. And our Dataset subclass *should* be a
> FileSystemDataset subclass, but FileSystemDataset is not "public API" etc.
> Happy to discuss additional details, for reference:
> github.com/eto-ai/lance
>
> Thanks!
>
> Chang
>

Re: pyarrow dataset API

Posted by Weston Pace <we...@gmail.com>.
> Do you have documentation for how you're envisioning schema evolution to work in Arrow?

So far I've been thinking about schema evolution purely in terms of
"what does the scanner need?" and the APIs I came up with are
documented here[1].  To summarize:

 * Each fragment has a "fragment schema".  This is the devolved
schema.  There is no expectation these schemas are the same between
fragments.  All batches in a fragment should have the same fragment
schema.
 * Each dataset has a "dataset schema".  This is the evolved schema.
The evolution strategy needs to be able to convert batches from
fragments to this schema.  All batches output by the scanner will
correspond to this schema.

There are a few operations:

 * Devolve filter: In scan options a pushdown filter is originally
expressed against the dataset schema.  The evolution strategy needs to
be able to devolve this filter (expression in, expression out) to an
expression that is bound to the fragment schema.
 * Devolve selection: In scan options the user picks what fields they
want from the dataset schema.  The evolution strategy needs to convert
this selection to a selection of fields from the fragment schema.
 * Evolve batch: A batch read from a fragment will have the fragment
schema.  The evolution strategy needs to evolve this batch to have the
dataset schema.  Note that this is also the point where we switch from
RecordBatch to ExecBatch which allows the evolution strategy to use a
scalar.

To aid in the "devolve filter" operation there is a "get guarantee"
operation.  This allows an evolution strategy to supply a guarantee
that the pushdown filter will be simplified against.  For example,
when a column is missing the guarantee is usually "x is null" and a
filter that looks like y > 0 && x < 50 will simplify to false.
Possibly a strategy can only implement "get guarantee" and not have to
worry about "devolve filter".

The initial strategy I am considering roughly corresponds to what we
do today and is name-based and supports missing / reordered columns.
To devolve a numeric field reference the field is looked up in the
dataset schema, a field is looked for with the same name in the
fragment schema, and then the index of that discovered field is found
(or the field is assumed to be missing if no field with the given name
is found).

 * Devolve filter: Filters are simplified to account for missing
columns with the expression "x is null".  Numeric field references are
modified as described above.
 * Devolve selection: Field references are modified as described
above.  Missing fields are dropped from the selection.
 * Evolve batch: If a field was missing we insert a null scalar for that column.

There are a few thorny situations:

 * Columns with duplicate names don't work well (I don't remember if
we reject or do best-effort)
 * Doesn't support renaming columns

[1] https://github.com/apache/arrow/blob/8074496cb41bc8ec8fe9fc814ca5576d89a6eb94/cpp/src/arrow/dataset/dataset.h#L248

On Thu, Nov 3, 2022 at 6:26 AM Chang She <ch...@eto.ai> wrote:
>
> Thanks Weston!
>
> *> FileSystemDataset in PXD*
>
> sorry, I was ambiguous about that. I was referring to the cdef class in
> https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset.pyx#L542
> . Previously this was because upstream tooling like duckdb would check for
> instanceof FileSystemDataset. However,
> https://github.com/duckdb/duckdb/issues/5163 has since been fixed so this
> is no longer an issue.
>
> *> dataset & scanner are not considered to be an extension point*
>
> ok good to know thanks. The reason why we created subclasses here was
> because the existing scanner didn't have options to push down offset/limit
> (among other things). For computer vision where you have large blobs, it
> can be prohibitively expensive to first read the whole image / tensor
> column into memory first.
>
> Excited to hear you're working on offset / limit => we'd be super happy to
> ditch our custom Dataset/Scanner workarounds. Lance is pretty early so I'm
> not concern about backwards compat yet. And this would help us be more
> tightly integrated into the Arrow ecosystem.
>
>
> *Schema evolution*
>
> We actually just shipped dataset versioning for Lance today. Currently it
> supports appends, but we're working on full schema evolution / support. We
> had to do this outside of iceberg because we're not using parquet). Do you
> have documentation for how you're envisioning schema evolution to work in
> Arrow? Would you be open to chatting with us about how we can plan for
> these changes?
>
>
> Anyways, excited about the roadmap you listed and thanks for the discussion!
>
> -C
>
>
>
> > ---------- Forwarded message ---------
> > From: Weston Pace <we...@gmail.com>
> > Date: Wed, Nov 2, 2022 at 4:55 PM
> > Subject: Re: pyarrow dataset API
> > To: <de...@arrow.apache.org>
> >
> >
> > FileSystemDataset is part of the public API (and in a pxd file[1]).  I
> > would agree it's fair to say that pyarrow datasets are no longer
> > experimental.
> >
> > > Instead we subclass Dataset and return a custom scanner we created. And
> > our Dataset subclass *should* be a FileSystemDataset subclass, but
> > FileSystemDataset is not "public API" etc.
> >
> > Hmm, perhaps the problem is that dataset & scanner are not considered
> > to be an extension point, compared to something like FileFormat or
> > Fragment.  I would argue that even something like FileSystemDataset
> > probably doesn't need to be a child class of Dataset but is more a
> > combination of:
> >
> >  * Dataset discovery (this is already a standalone utility)
> >  * FileFragment
> >  * Dataset write (this is already a static method)
> >
> > A "dataset" then is more of a container (e.g. a collection of
> > fragments and a schema) and not something that actually has
> > functionality.
> >
> > Setting this aside for a moment, I have been slowly working on new
> > scanning functionality[2].  I've been hoping to support offset/limit
> > natively within the new scanner work, but haven't had time to get to
> > it yet. Some other goals:
> >
> >  * Better support for cancellation and error handling (currently, on
> > errors, we continue to read files even after the scan completes, which
> > can lead to further errors / crashes)
> >  * Formally defining schema evolution, with the hope of allowing
> > others to add more sophisticated approaches here (e.g. parquet column
> > id for integration with iceberg)
> >  * More control over scheduling and, eventually, a better ordering of
> > scan tasks to facilitate in-order traversal
> >  * Native limit / offset which could run in parallel and still prevent
> > over-read (except for some potential over-read of metadata when
> > running in parallel) for nice formats (e.g. not CSV or JSON)
> >  * Simpler scan options (e.g. projection is very confusing in the scan
> > options today, ability to specify readahead limits in bytes, etc.)
> >  * Simpler implementation (switch from merge generator to async task
> > scheduler)
> >
> > Unfortunately, it may be a breaking change.  I think I can adapt the
> > existing fragment API onto the new fragment API[3] (which is hopefully
> > simpler).  But your changes to scanner/dataset might not map as
> > cleanly.  When I get close to the point of switching (right now I'm
> > hoping to get this wrapped up before or during the winter holidays)
> > I'd like to work with you to ensure we can get lance working with the
> > new scanner as well.
> >
> > [1]
> > https://github.com/apache/arrow/blob/5e53978b56aa13f9c033f2e849cc22f2aed6e2d3/python/pyarrow/includes/libarrow_dataset.pxd#L236
> > [2]
> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/scan_node.cc
> > [3]
> > https://github.com/apache/arrow/blob/5e53978b56aa13f9c033f2e849cc22f2aed6e2d3/cpp/src/arrow/dataset/dataset.h#L162
> >
> > On Tue, Nov 1, 2022 at 7:10 PM Micah Kornfield <em...@gmail.com>
> > wrote:
> > >
> > > Moving conversation to dev@ which is more appropriate place to discuss.
> > >
> > > On Tuesday, November 1, 2022, Chang She <ch...@eto.ai> wrote:
> > >
> > > > Hi there,
> > > >
> > > > The pyarrow dataset API is marked experimental so I'm curious if y'all
> > > > have made any decisions on it for upcoming releases. Specifically, any
> > > > thoughts on making the Scanner and things like FileSystemDataset part
> > of
> > > > the "public API" (i.e., putting declarations in the _dataset.pxd)? It
> > would
> > > > make it a lot easier for new data formats to be built on top of the
> > Arrow
> > > > platform. e.g., Lance supports efficient partial reads from s3 for
> > > > limit/offset (via additional ScanOptions), but currently it's
> > difficult to
> > > > expose the scanner to the rest of Arrow. Instead we subclass Dataset
> > and
> > > > return a custom scanner we created. And our Dataset subclass *should*
> > be a
> > > > FileSystemDataset subclass, but FileSystemDataset is not "public API"
> > etc.
> > > > Happy to discuss additional details, for reference:
> > > > github.com/eto-ai/lance
> > > >
> > > > Thanks!
> > > >
> > > > Chang
> > > >
> >
> >
> > --
> > Lei Xu
> > Eto Labs <https://eto.ai> | Calendly <https://calendly.com/leixu/30min>
> >
> >

Re: pyarrow dataset API

Posted by Chang She <ch...@eto.ai>.
Thanks Weston!

*> FileSystemDataset in PXD*

sorry, I was ambiguous about that. I was referring to the cdef class in
https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset.pyx#L542
. Previously this was because upstream tooling like duckdb would check for
instanceof FileSystemDataset. However,
https://github.com/duckdb/duckdb/issues/5163 has since been fixed so this
is no longer an issue.

*> dataset & scanner are not considered to be an extension point*

ok good to know thanks. The reason why we created subclasses here was
because the existing scanner didn't have options to push down offset/limit
(among other things). For computer vision where you have large blobs, it
can be prohibitively expensive to first read the whole image / tensor
column into memory first.

Excited to hear you're working on offset / limit => we'd be super happy to
ditch our custom Dataset/Scanner workarounds. Lance is pretty early so I'm
not concern about backwards compat yet. And this would help us be more
tightly integrated into the Arrow ecosystem.


*Schema evolution*

We actually just shipped dataset versioning for Lance today. Currently it
supports appends, but we're working on full schema evolution / support. We
had to do this outside of iceberg because we're not using parquet). Do you
have documentation for how you're envisioning schema evolution to work in
Arrow? Would you be open to chatting with us about how we can plan for
these changes?


Anyways, excited about the roadmap you listed and thanks for the discussion!

-C



> ---------- Forwarded message ---------
> From: Weston Pace <we...@gmail.com>
> Date: Wed, Nov 2, 2022 at 4:55 PM
> Subject: Re: pyarrow dataset API
> To: <de...@arrow.apache.org>
>
>
> FileSystemDataset is part of the public API (and in a pxd file[1]).  I
> would agree it's fair to say that pyarrow datasets are no longer
> experimental.
>
> > Instead we subclass Dataset and return a custom scanner we created. And
> our Dataset subclass *should* be a FileSystemDataset subclass, but
> FileSystemDataset is not "public API" etc.
>
> Hmm, perhaps the problem is that dataset & scanner are not considered
> to be an extension point, compared to something like FileFormat or
> Fragment.  I would argue that even something like FileSystemDataset
> probably doesn't need to be a child class of Dataset but is more a
> combination of:
>
>  * Dataset discovery (this is already a standalone utility)
>  * FileFragment
>  * Dataset write (this is already a static method)
>
> A "dataset" then is more of a container (e.g. a collection of
> fragments and a schema) and not something that actually has
> functionality.
>
> Setting this aside for a moment, I have been slowly working on new
> scanning functionality[2].  I've been hoping to support offset/limit
> natively within the new scanner work, but haven't had time to get to
> it yet. Some other goals:
>
>  * Better support for cancellation and error handling (currently, on
> errors, we continue to read files even after the scan completes, which
> can lead to further errors / crashes)
>  * Formally defining schema evolution, with the hope of allowing
> others to add more sophisticated approaches here (e.g. parquet column
> id for integration with iceberg)
>  * More control over scheduling and, eventually, a better ordering of
> scan tasks to facilitate in-order traversal
>  * Native limit / offset which could run in parallel and still prevent
> over-read (except for some potential over-read of metadata when
> running in parallel) for nice formats (e.g. not CSV or JSON)
>  * Simpler scan options (e.g. projection is very confusing in the scan
> options today, ability to specify readahead limits in bytes, etc.)
>  * Simpler implementation (switch from merge generator to async task
> scheduler)
>
> Unfortunately, it may be a breaking change.  I think I can adapt the
> existing fragment API onto the new fragment API[3] (which is hopefully
> simpler).  But your changes to scanner/dataset might not map as
> cleanly.  When I get close to the point of switching (right now I'm
> hoping to get this wrapped up before or during the winter holidays)
> I'd like to work with you to ensure we can get lance working with the
> new scanner as well.
>
> [1]
> https://github.com/apache/arrow/blob/5e53978b56aa13f9c033f2e849cc22f2aed6e2d3/python/pyarrow/includes/libarrow_dataset.pxd#L236
> [2]
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/scan_node.cc
> [3]
> https://github.com/apache/arrow/blob/5e53978b56aa13f9c033f2e849cc22f2aed6e2d3/cpp/src/arrow/dataset/dataset.h#L162
>
> On Tue, Nov 1, 2022 at 7:10 PM Micah Kornfield <em...@gmail.com>
> wrote:
> >
> > Moving conversation to dev@ which is more appropriate place to discuss.
> >
> > On Tuesday, November 1, 2022, Chang She <ch...@eto.ai> wrote:
> >
> > > Hi there,
> > >
> > > The pyarrow dataset API is marked experimental so I'm curious if y'all
> > > have made any decisions on it for upcoming releases. Specifically, any
> > > thoughts on making the Scanner and things like FileSystemDataset part
> of
> > > the "public API" (i.e., putting declarations in the _dataset.pxd)? It
> would
> > > make it a lot easier for new data formats to be built on top of the
> Arrow
> > > platform. e.g., Lance supports efficient partial reads from s3 for
> > > limit/offset (via additional ScanOptions), but currently it's
> difficult to
> > > expose the scanner to the rest of Arrow. Instead we subclass Dataset
> and
> > > return a custom scanner we created. And our Dataset subclass *should*
> be a
> > > FileSystemDataset subclass, but FileSystemDataset is not "public API"
> etc.
> > > Happy to discuss additional details, for reference:
> > > github.com/eto-ai/lance
> > >
> > > Thanks!
> > >
> > > Chang
> > >
>
>
> --
> Lei Xu
> Eto Labs <https://eto.ai> | Calendly <https://calendly.com/leixu/30min>
>
>

Re: pyarrow dataset API

Posted by Weston Pace <we...@gmail.com>.
FileSystemDataset is part of the public API (and in a pxd file[1]).  I
would agree it's fair to say that pyarrow datasets are no longer
experimental.

> Instead we subclass Dataset and return a custom scanner we created. And our Dataset subclass *should* be a FileSystemDataset subclass, but FileSystemDataset is not "public API" etc.

Hmm, perhaps the problem is that dataset & scanner are not considered
to be an extension point, compared to something like FileFormat or
Fragment.  I would argue that even something like FileSystemDataset
probably doesn't need to be a child class of Dataset but is more a
combination of:

 * Dataset discovery (this is already a standalone utility)
 * FileFragment
 * Dataset write (this is already a static method)

A "dataset" then is more of a container (e.g. a collection of
fragments and a schema) and not something that actually has
functionality.

Setting this aside for a moment, I have been slowly working on new
scanning functionality[2].  I've been hoping to support offset/limit
natively within the new scanner work, but haven't had time to get to
it yet. Some other goals:

 * Better support for cancellation and error handling (currently, on
errors, we continue to read files even after the scan completes, which
can lead to further errors / crashes)
 * Formally defining schema evolution, with the hope of allowing
others to add more sophisticated approaches here (e.g. parquet column
id for integration with iceberg)
 * More control over scheduling and, eventually, a better ordering of
scan tasks to facilitate in-order traversal
 * Native limit / offset which could run in parallel and still prevent
over-read (except for some potential over-read of metadata when
running in parallel) for nice formats (e.g. not CSV or JSON)
 * Simpler scan options (e.g. projection is very confusing in the scan
options today, ability to specify readahead limits in bytes, etc.)
 * Simpler implementation (switch from merge generator to async task scheduler)

Unfortunately, it may be a breaking change.  I think I can adapt the
existing fragment API onto the new fragment API[3] (which is hopefully
simpler).  But your changes to scanner/dataset might not map as
cleanly.  When I get close to the point of switching (right now I'm
hoping to get this wrapped up before or during the winter holidays)
I'd like to work with you to ensure we can get lance working with the
new scanner as well.

[1] https://github.com/apache/arrow/blob/5e53978b56aa13f9c033f2e849cc22f2aed6e2d3/python/pyarrow/includes/libarrow_dataset.pxd#L236
[2] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/scan_node.cc
[3] https://github.com/apache/arrow/blob/5e53978b56aa13f9c033f2e849cc22f2aed6e2d3/cpp/src/arrow/dataset/dataset.h#L162

On Tue, Nov 1, 2022 at 7:10 PM Micah Kornfield <em...@gmail.com> wrote:
>
> Moving conversation to dev@ which is more appropriate place to discuss.
>
> On Tuesday, November 1, 2022, Chang She <ch...@eto.ai> wrote:
>
> > Hi there,
> >
> > The pyarrow dataset API is marked experimental so I'm curious if y'all
> > have made any decisions on it for upcoming releases. Specifically, any
> > thoughts on making the Scanner and things like FileSystemDataset part of
> > the "public API" (i.e., putting declarations in the _dataset.pxd)? It would
> > make it a lot easier for new data formats to be built on top of the Arrow
> > platform. e.g., Lance supports efficient partial reads from s3 for
> > limit/offset (via additional ScanOptions), but currently it's difficult to
> > expose the scanner to the rest of Arrow. Instead we subclass Dataset and
> > return a custom scanner we created. And our Dataset subclass *should* be a
> > FileSystemDataset subclass, but FileSystemDataset is not "public API" etc.
> > Happy to discuss additional details, for reference:
> > github.com/eto-ai/lance
> >
> > Thanks!
> >
> > Chang
> >