You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Rusty Conover <ru...@conover.me.INVALID> on 2023/05/29 14:26:03 UTC

[DISCUSS] Acero's ScanNode and Row Indexing across Scans

Hi Arrow Team,

I wanted to suggest an improvement regarding Acero's Scan node.
Currently, it provides useful information such as __fragment_index,
__batch_index, __filename, and __last_in_fragment. However, it would
be beneficial to have an additional column that returns an overall
"row index" from the source.

The row index would start from zero and increment for each row
retrieved from the source, particularly in the case of Parquet files.
Is it currently possible to obtain this row index or would expanding
the Scan node's behavior be required?

Having this row index column would be valuable in implementing support
for Iceberg's positional-based delete files, as outlined in the
following link:

https://iceberg.apache.org/spec/#delete-formats

While Iceberg's value-based deletes can already be performed using the
support for anti joins, using a projection node does not guarantee the
row ordering within an Acero graph. Hence, the inclusion of a
dedicated row index column would provide a more reliable solution in
this context.

Thank you for considering this suggestion.

Rusty

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Weston Pace <we...@gmail.com>.
That makes sense!  I can see how masked reads are useful in that kind of
approach too.  Thanks for the explanation.

On Fri, Jun 2, 2023, 8:45 AM Will Jones <wi...@gmail.com> wrote:

> > The main downside with using the mask (or any solution based on a filter
> > node / filtering) is that it requires that the delete indices go into the
> > plan itself.  So you need to first read the delete files and then create
> > the plan.  And, if there are many deleted rows, this can be costly.
>
> Ah, I see. I was assuming you could load the indices within the fragment
> scan, at the same time the page index was read. That's how I'm implementing
> with Lance, and how I plan to implement with Delta Lake. But if you can't
> do that, then filtering with an anti-join makes sense. You wouldn't want to
> include those in a plan.
>
> On Fri, Jun 2, 2023 at 7:38 AM Weston Pace <we...@gmail.com> wrote:
>
> > Also, for clarity, I do agree with Gang that these are both valuable
> > features in their own right.  A mask makes a lot of sense for page
> indices.
> >
> > On Fri, Jun 2, 2023 at 7:36 AM Weston Pace <we...@gmail.com>
> wrote:
> >
> > > > then I think the incremental cost of adding the
> > > > positional deletes to the mask is probably lower than the anti-join.
> > > Do you mean developer cost?  Then yes, I agree.  Although there may be
> > > some subtlety in the pushdown to connect a dataset filter to a parquet
> > > reader filter.
> > >
> > > The main downside with using the mask (or any solution based on a
> filter
> > > node / filtering) is that it requires that the delete indices go into
> the
> > > plan itself.  So you need to first read the delete files and then
> create
> > > the plan.  And, if there are many deleted rows, this can be costly.
> > >
> > > On Thu, Jun 1, 2023 at 7:13 PM Will Jones <wi...@gmail.com>
> > wrote:
> > >
> > >> That's a good point, Gang. To perform deletes, we definitely need the
> > row
> > >> index, so we'll want that regardless of whether it's used in scans.
> > >>
> > >> > I'm not sure a mask would be the ideal solution for Iceberg (though
> it
> > >> is
> > >> a reasonable feature in its own right) because I think position-based
> > >> deletes, in Iceberg, are still done using an anti-join and not a
> filter.
> > >>
> > >> For just positional deletes in isolation, I agree the mask wouldn't be
> > >> more
> > >> optimal than the anti-join. But if they end up using the mask for
> > >> filtering
> > >> with the page index, then I think the incremental cost of adding the
> > >> positional deletes to the mask is probably lower than the anti-join.
> > >>
> > >> On Thu, Jun 1, 2023 at 6:33 PM Gang Wu <us...@gmail.com> wrote:
> > >>
> > >> > IMO, the adding a row_index column from the reader is orthogonal to
> > >> > the mask implementation. Table formats (e.g. Apache Iceberg and
> > >> > Delta) require the knowledge of row index to finalize row deletion.
> It
> > >> > would be trivial to natively support row index from the file reader.
> > >> >
> > >> > Best,
> > >> > Gang
> > >> >
> > >> > On Fri, Jun 2, 2023 at 3:40 AM Weston Pace <we...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > I agree that having a row_index is a good approach.  I'm not sure
> a
> > >> mask
> > >> > > would be the ideal solution for Iceberg (though it is a reasonable
> > >> > feature
> > >> > > in its own right) because I think position-based deletes, in
> > Iceberg,
> > >> are
> > >> > > still done using an anti-join and not a filter.
> > >> > >
> > >> > > That being said, we probably also want to implement a streaming
> > >> > merge-based
> > >> > > anti-join because I believe delete files are ordered by row_index
> > and
> > >> so
> > >> > a
> > >> > > streaming approach is likely to be much more performant.
> > >> > >
> > >> > > On Mon, May 29, 2023 at 4:01 PM Will Jones <
> will.jones127@gmail.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Hi Rusty,
> > >> > > >
> > >> > > > At first glance, I think adding a row_index column would make
> > >> sense. To
> > >> > > be
> > >> > > > clear, this would be an index within a file / fragment, not
> across
> > >> > > multiple
> > >> > > > files, which don't necessarily have a known ordering in Acero
> > >> (IIUC).
> > >> > > >
> > >> > > > However, another approach would be to take a mask argument in
> the
> > >> > Parquet
> > >> > > > reader. We may wish to do this anyways for support for using
> > >> predicate
> > >> > > > pushdown with Parquet's page index. While Arrow C++ hasn't yet
> > >> > > implemented
> > >> > > > predicate pushdown on page index (right now just supports row
> > >> groups),
> > >> > > > Arrow Rust has and provides an API to pass in a mask to support
> > it.
> > >> The
> > >> > > > reason for this implementation is described in the blog post
> > >> "Querying
> > >> > > > Parquet with Millisecond Latency" [1], under "Page Pruning". The
> > >> > > > RowSelection struct API is worth a look [2].
> > >> > > >
> > >> > > > I'm not yet sure which would be preferable, but I think
> adopting a
> > >> > > similar
> > >> > > > pattern to what the Rust community has done may be wise. It's
> > >> possible
> > >> > > that
> > >> > > > row_index is easy to implement while the mask will take time, in
> > >> which
> > >> > > case
> > >> > > > row_index makes sense as an interim solution.
> > >> > > >
> > >> > > > Best,
> > >> > > >
> > >> > > > Will Jones
> > >> > > >
> > >> > > > [1]
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> > >> > > > [2]
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
> > >> > > >
> > >> > > > On Mon, May 29, 2023 at 2:12 PM Rusty Conover
> > >> <rusty@conover.me.invalid
> > >> > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Hi Arrow Team,
> > >> > > > >
> > >> > > > > I wanted to suggest an improvement regarding Acero's Scan
> node.
> > >> > > > > Currently, it provides useful information such as
> > >> __fragment_index,
> > >> > > > > __batch_index, __filename, and __last_in_fragment. However, it
> > >> would
> > >> > > > > be beneficial to have an additional column that returns an
> > overall
> > >> > > > > "row index" from the source.
> > >> > > > >
> > >> > > > > The row index would start from zero and increment for each row
> > >> > > > > retrieved from the source, particularly in the case of Parquet
> > >> files.
> > >> > > > > Is it currently possible to obtain this row index or would
> > >> expanding
> > >> > > > > the Scan node's behavior be required?
> > >> > > > >
> > >> > > > > Having this row index column would be valuable in implementing
> > >> > support
> > >> > > > > for Iceberg's positional-based delete files, as outlined in
> the
> > >> > > > > following link:
> > >> > > > >
> > >> > > > > https://iceberg.apache.org/spec/#delete-formats
> > >> > > > >
> > >> > > > > While Iceberg's value-based deletes can already be performed
> > using
> > >> > the
> > >> > > > > support for anti joins, using a projection node does not
> > guarantee
> > >> > the
> > >> > > > > row ordering within an Acero graph. Hence, the inclusion of a
> > >> > > > > dedicated row index column would provide a more reliable
> > solution
> > >> in
> > >> > > > > this context.
> > >> > > > >
> > >> > > > > Thank you for considering this suggestion.
> > >> > > > >
> > >> > > > > Rusty
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Will Jones <wi...@gmail.com>.
> The main downside with using the mask (or any solution based on a filter
> node / filtering) is that it requires that the delete indices go into the
> plan itself.  So you need to first read the delete files and then create
> the plan.  And, if there are many deleted rows, this can be costly.

Ah, I see. I was assuming you could load the indices within the fragment
scan, at the same time the page index was read. That's how I'm implementing
with Lance, and how I plan to implement with Delta Lake. But if you can't
do that, then filtering with an anti-join makes sense. You wouldn't want to
include those in a plan.

On Fri, Jun 2, 2023 at 7:38 AM Weston Pace <we...@gmail.com> wrote:

> Also, for clarity, I do agree with Gang that these are both valuable
> features in their own right.  A mask makes a lot of sense for page indices.
>
> On Fri, Jun 2, 2023 at 7:36 AM Weston Pace <we...@gmail.com> wrote:
>
> > > then I think the incremental cost of adding the
> > > positional deletes to the mask is probably lower than the anti-join.
> > Do you mean developer cost?  Then yes, I agree.  Although there may be
> > some subtlety in the pushdown to connect a dataset filter to a parquet
> > reader filter.
> >
> > The main downside with using the mask (or any solution based on a filter
> > node / filtering) is that it requires that the delete indices go into the
> > plan itself.  So you need to first read the delete files and then create
> > the plan.  And, if there are many deleted rows, this can be costly.
> >
> > On Thu, Jun 1, 2023 at 7:13 PM Will Jones <wi...@gmail.com>
> wrote:
> >
> >> That's a good point, Gang. To perform deletes, we definitely need the
> row
> >> index, so we'll want that regardless of whether it's used in scans.
> >>
> >> > I'm not sure a mask would be the ideal solution for Iceberg (though it
> >> is
> >> a reasonable feature in its own right) because I think position-based
> >> deletes, in Iceberg, are still done using an anti-join and not a filter.
> >>
> >> For just positional deletes in isolation, I agree the mask wouldn't be
> >> more
> >> optimal than the anti-join. But if they end up using the mask for
> >> filtering
> >> with the page index, then I think the incremental cost of adding the
> >> positional deletes to the mask is probably lower than the anti-join.
> >>
> >> On Thu, Jun 1, 2023 at 6:33 PM Gang Wu <us...@gmail.com> wrote:
> >>
> >> > IMO, the adding a row_index column from the reader is orthogonal to
> >> > the mask implementation. Table formats (e.g. Apache Iceberg and
> >> > Delta) require the knowledge of row index to finalize row deletion. It
> >> > would be trivial to natively support row index from the file reader.
> >> >
> >> > Best,
> >> > Gang
> >> >
> >> > On Fri, Jun 2, 2023 at 3:40 AM Weston Pace <we...@gmail.com>
> >> wrote:
> >> >
> >> > > I agree that having a row_index is a good approach.  I'm not sure a
> >> mask
> >> > > would be the ideal solution for Iceberg (though it is a reasonable
> >> > feature
> >> > > in its own right) because I think position-based deletes, in
> Iceberg,
> >> are
> >> > > still done using an anti-join and not a filter.
> >> > >
> >> > > That being said, we probably also want to implement a streaming
> >> > merge-based
> >> > > anti-join because I believe delete files are ordered by row_index
> and
> >> so
> >> > a
> >> > > streaming approach is likely to be much more performant.
> >> > >
> >> > > On Mon, May 29, 2023 at 4:01 PM Will Jones <will.jones127@gmail.com
> >
> >> > > wrote:
> >> > >
> >> > > > Hi Rusty,
> >> > > >
> >> > > > At first glance, I think adding a row_index column would make
> >> sense. To
> >> > > be
> >> > > > clear, this would be an index within a file / fragment, not across
> >> > > multiple
> >> > > > files, which don't necessarily have a known ordering in Acero
> >> (IIUC).
> >> > > >
> >> > > > However, another approach would be to take a mask argument in the
> >> > Parquet
> >> > > > reader. We may wish to do this anyways for support for using
> >> predicate
> >> > > > pushdown with Parquet's page index. While Arrow C++ hasn't yet
> >> > > implemented
> >> > > > predicate pushdown on page index (right now just supports row
> >> groups),
> >> > > > Arrow Rust has and provides an API to pass in a mask to support
> it.
> >> The
> >> > > > reason for this implementation is described in the blog post
> >> "Querying
> >> > > > Parquet with Millisecond Latency" [1], under "Page Pruning". The
> >> > > > RowSelection struct API is worth a look [2].
> >> > > >
> >> > > > I'm not yet sure which would be preferable, but I think adopting a
> >> > > similar
> >> > > > pattern to what the Rust community has done may be wise. It's
> >> possible
> >> > > that
> >> > > > row_index is easy to implement while the mask will take time, in
> >> which
> >> > > case
> >> > > > row_index makes sense as an interim solution.
> >> > > >
> >> > > > Best,
> >> > > >
> >> > > > Will Jones
> >> > > >
> >> > > > [1]
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> >> > > > [2]
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
> >> > > >
> >> > > > On Mon, May 29, 2023 at 2:12 PM Rusty Conover
> >> <rusty@conover.me.invalid
> >> > >
> >> > > > wrote:
> >> > > >
> >> > > > > Hi Arrow Team,
> >> > > > >
> >> > > > > I wanted to suggest an improvement regarding Acero's Scan node.
> >> > > > > Currently, it provides useful information such as
> >> __fragment_index,
> >> > > > > __batch_index, __filename, and __last_in_fragment. However, it
> >> would
> >> > > > > be beneficial to have an additional column that returns an
> overall
> >> > > > > "row index" from the source.
> >> > > > >
> >> > > > > The row index would start from zero and increment for each row
> >> > > > > retrieved from the source, particularly in the case of Parquet
> >> files.
> >> > > > > Is it currently possible to obtain this row index or would
> >> expanding
> >> > > > > the Scan node's behavior be required?
> >> > > > >
> >> > > > > Having this row index column would be valuable in implementing
> >> > support
> >> > > > > for Iceberg's positional-based delete files, as outlined in the
> >> > > > > following link:
> >> > > > >
> >> > > > > https://iceberg.apache.org/spec/#delete-formats
> >> > > > >
> >> > > > > While Iceberg's value-based deletes can already be performed
> using
> >> > the
> >> > > > > support for anti joins, using a projection node does not
> guarantee
> >> > the
> >> > > > > row ordering within an Acero graph. Hence, the inclusion of a
> >> > > > > dedicated row index column would provide a more reliable
> solution
> >> in
> >> > > > > this context.
> >> > > > >
> >> > > > > Thank you for considering this suggestion.
> >> > > > >
> >> > > > > Rusty
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Weston Pace <we...@gmail.com>.
Also, for clarity, I do agree with Gang that these are both valuable
features in their own right.  A mask makes a lot of sense for page indices.

On Fri, Jun 2, 2023 at 7:36 AM Weston Pace <we...@gmail.com> wrote:

> > then I think the incremental cost of adding the
> > positional deletes to the mask is probably lower than the anti-join.
> Do you mean developer cost?  Then yes, I agree.  Although there may be
> some subtlety in the pushdown to connect a dataset filter to a parquet
> reader filter.
>
> The main downside with using the mask (or any solution based on a filter
> node / filtering) is that it requires that the delete indices go into the
> plan itself.  So you need to first read the delete files and then create
> the plan.  And, if there are many deleted rows, this can be costly.
>
> On Thu, Jun 1, 2023 at 7:13 PM Will Jones <wi...@gmail.com> wrote:
>
>> That's a good point, Gang. To perform deletes, we definitely need the row
>> index, so we'll want that regardless of whether it's used in scans.
>>
>> > I'm not sure a mask would be the ideal solution for Iceberg (though it
>> is
>> a reasonable feature in its own right) because I think position-based
>> deletes, in Iceberg, are still done using an anti-join and not a filter.
>>
>> For just positional deletes in isolation, I agree the mask wouldn't be
>> more
>> optimal than the anti-join. But if they end up using the mask for
>> filtering
>> with the page index, then I think the incremental cost of adding the
>> positional deletes to the mask is probably lower than the anti-join.
>>
>> On Thu, Jun 1, 2023 at 6:33 PM Gang Wu <us...@gmail.com> wrote:
>>
>> > IMO, the adding a row_index column from the reader is orthogonal to
>> > the mask implementation. Table formats (e.g. Apache Iceberg and
>> > Delta) require the knowledge of row index to finalize row deletion. It
>> > would be trivial to natively support row index from the file reader.
>> >
>> > Best,
>> > Gang
>> >
>> > On Fri, Jun 2, 2023 at 3:40 AM Weston Pace <we...@gmail.com>
>> wrote:
>> >
>> > > I agree that having a row_index is a good approach.  I'm not sure a
>> mask
>> > > would be the ideal solution for Iceberg (though it is a reasonable
>> > feature
>> > > in its own right) because I think position-based deletes, in Iceberg,
>> are
>> > > still done using an anti-join and not a filter.
>> > >
>> > > That being said, we probably also want to implement a streaming
>> > merge-based
>> > > anti-join because I believe delete files are ordered by row_index and
>> so
>> > a
>> > > streaming approach is likely to be much more performant.
>> > >
>> > > On Mon, May 29, 2023 at 4:01 PM Will Jones <wi...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Rusty,
>> > > >
>> > > > At first glance, I think adding a row_index column would make
>> sense. To
>> > > be
>> > > > clear, this would be an index within a file / fragment, not across
>> > > multiple
>> > > > files, which don't necessarily have a known ordering in Acero
>> (IIUC).
>> > > >
>> > > > However, another approach would be to take a mask argument in the
>> > Parquet
>> > > > reader. We may wish to do this anyways for support for using
>> predicate
>> > > > pushdown with Parquet's page index. While Arrow C++ hasn't yet
>> > > implemented
>> > > > predicate pushdown on page index (right now just supports row
>> groups),
>> > > > Arrow Rust has and provides an API to pass in a mask to support it.
>> The
>> > > > reason for this implementation is described in the blog post
>> "Querying
>> > > > Parquet with Millisecond Latency" [1], under "Page Pruning". The
>> > > > RowSelection struct API is worth a look [2].
>> > > >
>> > > > I'm not yet sure which would be preferable, but I think adopting a
>> > > similar
>> > > > pattern to what the Rust community has done may be wise. It's
>> possible
>> > > that
>> > > > row_index is easy to implement while the mask will take time, in
>> which
>> > > case
>> > > > row_index makes sense as an interim solution.
>> > > >
>> > > > Best,
>> > > >
>> > > > Will Jones
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
>> > > > [2]
>> > > >
>> > > >
>> > >
>> >
>> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
>> > > >
>> > > > On Mon, May 29, 2023 at 2:12 PM Rusty Conover
>> <rusty@conover.me.invalid
>> > >
>> > > > wrote:
>> > > >
>> > > > > Hi Arrow Team,
>> > > > >
>> > > > > I wanted to suggest an improvement regarding Acero's Scan node.
>> > > > > Currently, it provides useful information such as
>> __fragment_index,
>> > > > > __batch_index, __filename, and __last_in_fragment. However, it
>> would
>> > > > > be beneficial to have an additional column that returns an overall
>> > > > > "row index" from the source.
>> > > > >
>> > > > > The row index would start from zero and increment for each row
>> > > > > retrieved from the source, particularly in the case of Parquet
>> files.
>> > > > > Is it currently possible to obtain this row index or would
>> expanding
>> > > > > the Scan node's behavior be required?
>> > > > >
>> > > > > Having this row index column would be valuable in implementing
>> > support
>> > > > > for Iceberg's positional-based delete files, as outlined in the
>> > > > > following link:
>> > > > >
>> > > > > https://iceberg.apache.org/spec/#delete-formats
>> > > > >
>> > > > > While Iceberg's value-based deletes can already be performed using
>> > the
>> > > > > support for anti joins, using a projection node does not guarantee
>> > the
>> > > > > row ordering within an Acero graph. Hence, the inclusion of a
>> > > > > dedicated row index column would provide a more reliable solution
>> in
>> > > > > this context.
>> > > > >
>> > > > > Thank you for considering this suggestion.
>> > > > >
>> > > > > Rusty
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Weston Pace <we...@gmail.com>.
> then I think the incremental cost of adding the
> positional deletes to the mask is probably lower than the anti-join.
Do you mean developer cost?  Then yes, I agree.  Although there may be some
subtlety in the pushdown to connect a dataset filter to a parquet reader
filter.

The main downside with using the mask (or any solution based on a filter
node / filtering) is that it requires that the delete indices go into the
plan itself.  So you need to first read the delete files and then create
the plan.  And, if there are many deleted rows, this can be costly.

On Thu, Jun 1, 2023 at 7:13 PM Will Jones <wi...@gmail.com> wrote:

> That's a good point, Gang. To perform deletes, we definitely need the row
> index, so we'll want that regardless of whether it's used in scans.
>
> > I'm not sure a mask would be the ideal solution for Iceberg (though it is
> a reasonable feature in its own right) because I think position-based
> deletes, in Iceberg, are still done using an anti-join and not a filter.
>
> For just positional deletes in isolation, I agree the mask wouldn't be more
> optimal than the anti-join. But if they end up using the mask for filtering
> with the page index, then I think the incremental cost of adding the
> positional deletes to the mask is probably lower than the anti-join.
>
> On Thu, Jun 1, 2023 at 6:33 PM Gang Wu <us...@gmail.com> wrote:
>
> > IMO, the adding a row_index column from the reader is orthogonal to
> > the mask implementation. Table formats (e.g. Apache Iceberg and
> > Delta) require the knowledge of row index to finalize row deletion. It
> > would be trivial to natively support row index from the file reader.
> >
> > Best,
> > Gang
> >
> > On Fri, Jun 2, 2023 at 3:40 AM Weston Pace <we...@gmail.com>
> wrote:
> >
> > > I agree that having a row_index is a good approach.  I'm not sure a
> mask
> > > would be the ideal solution for Iceberg (though it is a reasonable
> > feature
> > > in its own right) because I think position-based deletes, in Iceberg,
> are
> > > still done using an anti-join and not a filter.
> > >
> > > That being said, we probably also want to implement a streaming
> > merge-based
> > > anti-join because I believe delete files are ordered by row_index and
> so
> > a
> > > streaming approach is likely to be much more performant.
> > >
> > > On Mon, May 29, 2023 at 4:01 PM Will Jones <wi...@gmail.com>
> > > wrote:
> > >
> > > > Hi Rusty,
> > > >
> > > > At first glance, I think adding a row_index column would make sense.
> To
> > > be
> > > > clear, this would be an index within a file / fragment, not across
> > > multiple
> > > > files, which don't necessarily have a known ordering in Acero (IIUC).
> > > >
> > > > However, another approach would be to take a mask argument in the
> > Parquet
> > > > reader. We may wish to do this anyways for support for using
> predicate
> > > > pushdown with Parquet's page index. While Arrow C++ hasn't yet
> > > implemented
> > > > predicate pushdown on page index (right now just supports row
> groups),
> > > > Arrow Rust has and provides an API to pass in a mask to support it.
> The
> > > > reason for this implementation is described in the blog post
> "Querying
> > > > Parquet with Millisecond Latency" [1], under "Page Pruning". The
> > > > RowSelection struct API is worth a look [2].
> > > >
> > > > I'm not yet sure which would be preferable, but I think adopting a
> > > similar
> > > > pattern to what the Rust community has done may be wise. It's
> possible
> > > that
> > > > row_index is easy to implement while the mask will take time, in
> which
> > > case
> > > > row_index makes sense as an interim solution.
> > > >
> > > > Best,
> > > >
> > > > Will Jones
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> > > > [2]
> > > >
> > > >
> > >
> >
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
> > > >
> > > > On Mon, May 29, 2023 at 2:12 PM Rusty Conover
> <rusty@conover.me.invalid
> > >
> > > > wrote:
> > > >
> > > > > Hi Arrow Team,
> > > > >
> > > > > I wanted to suggest an improvement regarding Acero's Scan node.
> > > > > Currently, it provides useful information such as __fragment_index,
> > > > > __batch_index, __filename, and __last_in_fragment. However, it
> would
> > > > > be beneficial to have an additional column that returns an overall
> > > > > "row index" from the source.
> > > > >
> > > > > The row index would start from zero and increment for each row
> > > > > retrieved from the source, particularly in the case of Parquet
> files.
> > > > > Is it currently possible to obtain this row index or would
> expanding
> > > > > the Scan node's behavior be required?
> > > > >
> > > > > Having this row index column would be valuable in implementing
> > support
> > > > > for Iceberg's positional-based delete files, as outlined in the
> > > > > following link:
> > > > >
> > > > > https://iceberg.apache.org/spec/#delete-formats
> > > > >
> > > > > While Iceberg's value-based deletes can already be performed using
> > the
> > > > > support for anti joins, using a projection node does not guarantee
> > the
> > > > > row ordering within an Acero graph. Hence, the inclusion of a
> > > > > dedicated row index column would provide a more reliable solution
> in
> > > > > this context.
> > > > >
> > > > > Thank you for considering this suggestion.
> > > > >
> > > > > Rusty
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Will Jones <wi...@gmail.com>.
That's a good point, Gang. To perform deletes, we definitely need the row
index, so we'll want that regardless of whether it's used in scans.

> I'm not sure a mask would be the ideal solution for Iceberg (though it is
a reasonable feature in its own right) because I think position-based
deletes, in Iceberg, are still done using an anti-join and not a filter.

For just positional deletes in isolation, I agree the mask wouldn't be more
optimal than the anti-join. But if they end up using the mask for filtering
with the page index, then I think the incremental cost of adding the
positional deletes to the mask is probably lower than the anti-join.

On Thu, Jun 1, 2023 at 6:33 PM Gang Wu <us...@gmail.com> wrote:

> IMO, the adding a row_index column from the reader is orthogonal to
> the mask implementation. Table formats (e.g. Apache Iceberg and
> Delta) require the knowledge of row index to finalize row deletion. It
> would be trivial to natively support row index from the file reader.
>
> Best,
> Gang
>
> On Fri, Jun 2, 2023 at 3:40 AM Weston Pace <we...@gmail.com> wrote:
>
> > I agree that having a row_index is a good approach.  I'm not sure a mask
> > would be the ideal solution for Iceberg (though it is a reasonable
> feature
> > in its own right) because I think position-based deletes, in Iceberg, are
> > still done using an anti-join and not a filter.
> >
> > That being said, we probably also want to implement a streaming
> merge-based
> > anti-join because I believe delete files are ordered by row_index and so
> a
> > streaming approach is likely to be much more performant.
> >
> > On Mon, May 29, 2023 at 4:01 PM Will Jones <wi...@gmail.com>
> > wrote:
> >
> > > Hi Rusty,
> > >
> > > At first glance, I think adding a row_index column would make sense. To
> > be
> > > clear, this would be an index within a file / fragment, not across
> > multiple
> > > files, which don't necessarily have a known ordering in Acero (IIUC).
> > >
> > > However, another approach would be to take a mask argument in the
> Parquet
> > > reader. We may wish to do this anyways for support for using predicate
> > > pushdown with Parquet's page index. While Arrow C++ hasn't yet
> > implemented
> > > predicate pushdown on page index (right now just supports row groups),
> > > Arrow Rust has and provides an API to pass in a mask to support it. The
> > > reason for this implementation is described in the blog post "Querying
> > > Parquet with Millisecond Latency" [1], under "Page Pruning". The
> > > RowSelection struct API is worth a look [2].
> > >
> > > I'm not yet sure which would be preferable, but I think adopting a
> > similar
> > > pattern to what the Rust community has done may be wise. It's possible
> > that
> > > row_index is easy to implement while the mask will take time, in which
> > case
> > > row_index makes sense as an interim solution.
> > >
> > > Best,
> > >
> > > Will Jones
> > >
> > > [1]
> > >
> > >
> >
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> > > [2]
> > >
> > >
> >
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
> > >
> > > On Mon, May 29, 2023 at 2:12 PM Rusty Conover <rusty@conover.me.invalid
> >
> > > wrote:
> > >
> > > > Hi Arrow Team,
> > > >
> > > > I wanted to suggest an improvement regarding Acero's Scan node.
> > > > Currently, it provides useful information such as __fragment_index,
> > > > __batch_index, __filename, and __last_in_fragment. However, it would
> > > > be beneficial to have an additional column that returns an overall
> > > > "row index" from the source.
> > > >
> > > > The row index would start from zero and increment for each row
> > > > retrieved from the source, particularly in the case of Parquet files.
> > > > Is it currently possible to obtain this row index or would expanding
> > > > the Scan node's behavior be required?
> > > >
> > > > Having this row index column would be valuable in implementing
> support
> > > > for Iceberg's positional-based delete files, as outlined in the
> > > > following link:
> > > >
> > > > https://iceberg.apache.org/spec/#delete-formats
> > > >
> > > > While Iceberg's value-based deletes can already be performed using
> the
> > > > support for anti joins, using a projection node does not guarantee
> the
> > > > row ordering within an Acero graph. Hence, the inclusion of a
> > > > dedicated row index column would provide a more reliable solution in
> > > > this context.
> > > >
> > > > Thank you for considering this suggestion.
> > > >
> > > > Rusty
> > > >
> > >
> >
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Gang Wu <us...@gmail.com>.
IMO, the adding a row_index column from the reader is orthogonal to
the mask implementation. Table formats (e.g. Apache Iceberg and
Delta) require the knowledge of row index to finalize row deletion. It
would be trivial to natively support row index from the file reader.

Best,
Gang

On Fri, Jun 2, 2023 at 3:40 AM Weston Pace <we...@gmail.com> wrote:

> I agree that having a row_index is a good approach.  I'm not sure a mask
> would be the ideal solution for Iceberg (though it is a reasonable feature
> in its own right) because I think position-based deletes, in Iceberg, are
> still done using an anti-join and not a filter.
>
> That being said, we probably also want to implement a streaming merge-based
> anti-join because I believe delete files are ordered by row_index and so a
> streaming approach is likely to be much more performant.
>
> On Mon, May 29, 2023 at 4:01 PM Will Jones <wi...@gmail.com>
> wrote:
>
> > Hi Rusty,
> >
> > At first glance, I think adding a row_index column would make sense. To
> be
> > clear, this would be an index within a file / fragment, not across
> multiple
> > files, which don't necessarily have a known ordering in Acero (IIUC).
> >
> > However, another approach would be to take a mask argument in the Parquet
> > reader. We may wish to do this anyways for support for using predicate
> > pushdown with Parquet's page index. While Arrow C++ hasn't yet
> implemented
> > predicate pushdown on page index (right now just supports row groups),
> > Arrow Rust has and provides an API to pass in a mask to support it. The
> > reason for this implementation is described in the blog post "Querying
> > Parquet with Millisecond Latency" [1], under "Page Pruning". The
> > RowSelection struct API is worth a look [2].
> >
> > I'm not yet sure which would be preferable, but I think adopting a
> similar
> > pattern to what the Rust community has done may be wise. It's possible
> that
> > row_index is easy to implement while the mask will take time, in which
> case
> > row_index makes sense as an interim solution.
> >
> > Best,
> >
> > Will Jones
> >
> > [1]
> >
> >
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> > [2]
> >
> >
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
> >
> > On Mon, May 29, 2023 at 2:12 PM Rusty Conover <ru...@conover.me.invalid>
> > wrote:
> >
> > > Hi Arrow Team,
> > >
> > > I wanted to suggest an improvement regarding Acero's Scan node.
> > > Currently, it provides useful information such as __fragment_index,
> > > __batch_index, __filename, and __last_in_fragment. However, it would
> > > be beneficial to have an additional column that returns an overall
> > > "row index" from the source.
> > >
> > > The row index would start from zero and increment for each row
> > > retrieved from the source, particularly in the case of Parquet files.
> > > Is it currently possible to obtain this row index or would expanding
> > > the Scan node's behavior be required?
> > >
> > > Having this row index column would be valuable in implementing support
> > > for Iceberg's positional-based delete files, as outlined in the
> > > following link:
> > >
> > > https://iceberg.apache.org/spec/#delete-formats
> > >
> > > While Iceberg's value-based deletes can already be performed using the
> > > support for anti joins, using a projection node does not guarantee the
> > > row ordering within an Acero graph. Hence, the inclusion of a
> > > dedicated row index column would provide a more reliable solution in
> > > this context.
> > >
> > > Thank you for considering this suggestion.
> > >
> > > Rusty
> > >
> >
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Weston Pace <we...@gmail.com>.
I agree that having a row_index is a good approach.  I'm not sure a mask
would be the ideal solution for Iceberg (though it is a reasonable feature
in its own right) because I think position-based deletes, in Iceberg, are
still done using an anti-join and not a filter.

That being said, we probably also want to implement a streaming merge-based
anti-join because I believe delete files are ordered by row_index and so a
streaming approach is likely to be much more performant.

On Mon, May 29, 2023 at 4:01 PM Will Jones <wi...@gmail.com> wrote:

> Hi Rusty,
>
> At first glance, I think adding a row_index column would make sense. To be
> clear, this would be an index within a file / fragment, not across multiple
> files, which don't necessarily have a known ordering in Acero (IIUC).
>
> However, another approach would be to take a mask argument in the Parquet
> reader. We may wish to do this anyways for support for using predicate
> pushdown with Parquet's page index. While Arrow C++ hasn't yet implemented
> predicate pushdown on page index (right now just supports row groups),
> Arrow Rust has and provides an API to pass in a mask to support it. The
> reason for this implementation is described in the blog post "Querying
> Parquet with Millisecond Latency" [1], under "Page Pruning". The
> RowSelection struct API is worth a look [2].
>
> I'm not yet sure which would be preferable, but I think adopting a similar
> pattern to what the Rust community has done may be wise. It's possible that
> row_index is easy to implement while the mask will take time, in which case
> row_index makes sense as an interim solution.
>
> Best,
>
> Will Jones
>
> [1]
>
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> [2]
>
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
>
> On Mon, May 29, 2023 at 2:12 PM Rusty Conover <ru...@conover.me.invalid>
> wrote:
>
> > Hi Arrow Team,
> >
> > I wanted to suggest an improvement regarding Acero's Scan node.
> > Currently, it provides useful information such as __fragment_index,
> > __batch_index, __filename, and __last_in_fragment. However, it would
> > be beneficial to have an additional column that returns an overall
> > "row index" from the source.
> >
> > The row index would start from zero and increment for each row
> > retrieved from the source, particularly in the case of Parquet files.
> > Is it currently possible to obtain this row index or would expanding
> > the Scan node's behavior be required?
> >
> > Having this row index column would be valuable in implementing support
> > for Iceberg's positional-based delete files, as outlined in the
> > following link:
> >
> > https://iceberg.apache.org/spec/#delete-formats
> >
> > While Iceberg's value-based deletes can already be performed using the
> > support for anti joins, using a projection node does not guarantee the
> > row ordering within an Acero graph. Hence, the inclusion of a
> > dedicated row index column would provide a more reliable solution in
> > this context.
> >
> > Thank you for considering this suggestion.
> >
> > Rusty
> >
>

Re: [DISCUSS] Acero's ScanNode and Row Indexing across Scans

Posted by Will Jones <wi...@gmail.com>.
Hi Rusty,

At first glance, I think adding a row_index column would make sense. To be
clear, this would be an index within a file / fragment, not across multiple
files, which don't necessarily have a known ordering in Acero (IIUC).

However, another approach would be to take a mask argument in the Parquet
reader. We may wish to do this anyways for support for using predicate
pushdown with Parquet's page index. While Arrow C++ hasn't yet implemented
predicate pushdown on page index (right now just supports row groups),
Arrow Rust has and provides an API to pass in a mask to support it. The
reason for this implementation is described in the blog post "Querying
Parquet with Millisecond Latency" [1], under "Page Pruning". The
RowSelection struct API is worth a look [2].

I'm not yet sure which would be preferable, but I think adopting a similar
pattern to what the Rust community has done may be wise. It's possible that
row_index is easy to implement while the mask will take time, in which case
row_index makes sense as an interim solution.

Best,

Will Jones

[1]
https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
[2]
https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html

On Mon, May 29, 2023 at 2:12 PM Rusty Conover <ru...@conover.me.invalid>
wrote:

> Hi Arrow Team,
>
> I wanted to suggest an improvement regarding Acero's Scan node.
> Currently, it provides useful information such as __fragment_index,
> __batch_index, __filename, and __last_in_fragment. However, it would
> be beneficial to have an additional column that returns an overall
> "row index" from the source.
>
> The row index would start from zero and increment for each row
> retrieved from the source, particularly in the case of Parquet files.
> Is it currently possible to obtain this row index or would expanding
> the Scan node's behavior be required?
>
> Having this row index column would be valuable in implementing support
> for Iceberg's positional-based delete files, as outlined in the
> following link:
>
> https://iceberg.apache.org/spec/#delete-formats
>
> While Iceberg's value-based deletes can already be performed using the
> support for anti joins, using a projection node does not guarantee the
> row ordering within an Acero graph. Hence, the inclusion of a
> dedicated row index column would provide a more reliable solution in
> this context.
>
> Thank you for considering this suggestion.
>
> Rusty
>