You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Adam Lippai <ad...@rigo.sk> on 2022/04/12 19:51:19 UTC

Remote datasets

Hi,

I saw really nice features like groupby and join developed recently.
I like how Dataset is supported for joins and how streamed processing is
gaining momentum in Arrow.

Does Apache Arrow have the concept of remote datasets eg using Arrow
Flight? Or will this happen directly using S3 and other protocols only? I
know some work has started in Substrait, but that might be a whole new
level of integration, hence my question focusing on data first.

I was trying to browse the JIRA issues, but the future picture wasn't clear
based on that

Best regards,
Adam Lippai

Re: Remote datasets

Posted by Weston Pace <we...@gmail.com>.
I'll add my perspective (which hopefully doesn't confuse things more).
I think the fragment concept is a little too specific and the key
abstraction here is "stream of homogenous record batches".  This
manifests in a few different flavors (synchronous, asynchronous,
push/pull) but we have some general rules to switch between them so
they are more or less interchangeable.  By far the most common and
public variation is the RecordBatchReader which is a synchronous, pull
based "stream of record batches".  Notably, there is also the C stream
interface[1] which allows a "stream of record batches" to bridge
libraries and/or languages.

The execution engine in Arrow can be thought of as distinct from
datasets.  Indeed, it can be compiled today without the datasets
module.  Inputs are "streams of record batches" and the output is a
"stream of record batches".  The execution plan then specifies what
compute operations should be applied to the data as it flows through.
From that perspective a remote dataset is no problem since it can be
easily converted to a stream of record batches.

The datasets module really provides two different pieces of
functionality, the dataset/scanner, and a lightweight query API.

The dataset is a way to describe a collection of independently
scannable fragments.  Fragments can be anything that produces (on
request) a "stream of record batches" and there are a lot of utilities
in the datasets module for discovering fragments from filesystems
(including remote filesystems), managing the order of fragments,
attaching basic fragment information (e.g. filename) to outgoing
batches, etc.  The key distinction between a fragment and a "stream of
batches" is that a fragment describes how to produce a stream of
batches.  The scanner is then able to take all these independently
scannable fragments and scan them (usually in parallel).  The scanner
also needs to deal with the fact that many of these fragments may have
different schemas.  The output of the scanner is a "stream of record
batches" (with a homogenous schema) and so it makes a good input to
the execution engine.

Parts of the dataset API overlaps with Substrait and they are somewhat
interchangeable.  A Substrait ReadRel with a LocalFiles table is very
similar to a FileSystemDataset.  More generally, the Substrait
"ReadRel" and the "dataset" have the same goal, to define a scannable
source of data with instructions on how to scan it.

The datasets module also has a lightweight query API which has some
overlap with Substrait.  In pyarrow you might say
`dataset.to_table(columns={"double_x": ds.field("x") * 2},
filter=ds.field("y") == 3)` which is going to create an execution plan
with a scan node, filter node, and project node.  This could also be
expressed as a Substrait plan with a ReadRel, ProjectRel, and
FilterRel (or in Substrait you might actually push the projection and
filtering entirely into the ReadRel).

So, to summarize, the execution plan itself only really depends on
these two abstractions:
 * ExecPlan, describes what compute operations take place
   * Normally constructed via pyarrow's datasets, R's dplyr, or Substrait
 * "Stream of record batches"
   * Normally constructed via pyarrow's datasets or Substrait's ReadRel

As long as your use case can describe its compute with an ExecPlan and
can produce some kind of stream of record batches then you should able
to use the execution engine (e.g. filter, project, join, and
aggregate).

On Tue, Apr 12, 2022 at 10:52 AM Adam Lippai <ad...@rigo.sk> wrote:
>
> Hi David,
>
> This is a perfect answer. I was looking for the Fragment concept and the
> issues you linked make it easy to follow.
> I understand this is a really hard field with a ton of work, getting
> chunking, prefetch and backpressure correctly + adding filter predicate and
> other computation pushdown is an infinitely complex task.
>
> Thank you for making this clear. You make so great progress it's hard to
> keep up even with the big picture :D
>
> Best regards,
> Adam Lippai
>
> On Tue, Apr 12, 2022 at 4:46 PM David Li <li...@apache.org> wrote:
>
> > TL;DR yes, if and when all is said and done.
> >
> > Breaking this down…
> >
> > Substrait isn't really relevant here. It's a way to serialize a query in a
> > way that's agnostic to whatever's actually generating or executing the
> > query.
> >
> > But if you have a Substrait plan, that can get converted by the Arrow C++
> > Query Engine into its internal "ExecPlan" for execution, which is what's
> > actually implementing the joins, aggregations, etc. This engine operates in
> > a streaming fashion, so your application can take the data you get out and
> > use it with a Flight service/client, yes.
> >
> > The query engine pulls input from the Arrow Datasets library. (Though
> > while I speak of them separately, really, they are intertwined.) Datasets
> > is also a streaming interface to read Arrow data from various underlying
> > datasources, implementing things like projection pushdown and partitioning
> > where possible. This is agnostic to whether the data is local or remote,
> > i.e. there's no explicit concept of "remote dataset". It's all datasets
> > whether it's in memory, on local disk, or across the network.
> >
> > So if/when a Flight datasource ("Fragment") is implemented for Arrow
> > Datasets, this will be consumed in a streaming fashion, by a query engine
> > which itself is streaming, which can be fed into a streaming interface like
> > Flight. There's a good amount of work to do to ensure this all works well
> > together (e.g. ensuring backpressure gets reflected across all these
> > layers), but what you are asking for is in principle doable, if not quite
> > yet implemented.
> >
> > -David
> >
> > On Tue, Apr 12, 2022, at 16:21, Adam Lippai wrote:
> > > Hi James,
> > >
> > > Your answer helps, yes.
> > > My question is whether I will be able to join two datasets (producing a
> > new
> > > dataset) in a streaming way or do I have to fetch the whole response and
> > > keep it in memory?
> > > So if my local node has memory constraints, will it be able to stream
> > data
> > > from an Apache Flight datasource and stream it back to a different Apache
> > > Flight target?
> > > If the answer is yes, is it because there will be a Remote Dataset
> > concept
> > > or will it use "distributed computing" using Substrait?
> > >
> > > Best regards,
> > > Adam Lippai
> > >
> > > On Tue, Apr 12, 2022 at 4:14 PM James Duong <jamesd@bitquilltech.com
> > .invalid>
> > > wrote:
> > >
> > >> Hi Adam,
> > >>
> > >> Arrow Flight can be used to provide an RPC framework that returns
> > datasets
> > >> (sent over the wire as arrow buffers) and exposes them from a
> > FlightClient
> > >> as Arrow RecordBatches without serialization. Is this what you mean by
> > >> remote datasets?
> > >> Arrow Flight SQL is an application layer built on top of Arrow Flight
> > that
> > >> standardizes remote execution of SQL queries, getting catalog
> > information,
> > >> getting SQL capabilities, and other access-related concepts. Arrow
> > Flight
> > >> SQL is intended to provide a universal user-facing front end for
> > existing
> > >> SQL-capable database engines.
> > >>
> > >> Neither are really intended for computation, just remote access.
> > >>
> > >> On Tue, Apr 12, 2022 at 12:51 PM Adam Lippai <ad...@rigo.sk> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > I saw really nice features like groupby and join developed recently.
> > >> > I like how Dataset is supported for joins and how streamed processing
> > is
> > >> > gaining momentum in Arrow.
> > >> >
> > >> > Does Apache Arrow have the concept of remote datasets eg using Arrow
> > >> > Flight? Or will this happen directly using S3 and other protocols
> > only? I
> > >> > know some work has started in Substrait, but that might be a whole new
> > >> > level of integration, hence my question focusing on data first.
> > >> >
> > >> > I was trying to browse the JIRA issues, but the future picture wasn't
> > >> clear
> > >> > based on that
> > >> >
> > >> > Best regards,
> > >> > Adam Lippai
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> *James Duong*
> > >> Lead Software Developer
> > >> Bit Quill Technologies Inc.
> > >> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> > >> https://www.bitquilltech.com
> > >>
> > >> This email message is for the sole use of the intended recipient(s) and
> > may
> > >> contain confidential and privileged information.  Any unauthorized
> > review,
> > >> use, disclosure, or distribution is prohibited.  If you are not the
> > >> intended recipient, please contact the sender by reply email and destroy
> > >> all copies of the original message.  Thank you.
> > >>
> >

Re: Remote datasets

Posted by Adam Lippai <ad...@rigo.sk>.
Hi David,

This is a perfect answer. I was looking for the Fragment concept and the
issues you linked make it easy to follow.
I understand this is a really hard field with a ton of work, getting
chunking, prefetch and backpressure correctly + adding filter predicate and
other computation pushdown is an infinitely complex task.

Thank you for making this clear. You make so great progress it's hard to
keep up even with the big picture :D

Best regards,
Adam Lippai

On Tue, Apr 12, 2022 at 4:46 PM David Li <li...@apache.org> wrote:

> TL;DR yes, if and when all is said and done.
>
> Breaking this down…
>
> Substrait isn't really relevant here. It's a way to serialize a query in a
> way that's agnostic to whatever's actually generating or executing the
> query.
>
> But if you have a Substrait plan, that can get converted by the Arrow C++
> Query Engine into its internal "ExecPlan" for execution, which is what's
> actually implementing the joins, aggregations, etc. This engine operates in
> a streaming fashion, so your application can take the data you get out and
> use it with a Flight service/client, yes.
>
> The query engine pulls input from the Arrow Datasets library. (Though
> while I speak of them separately, really, they are intertwined.) Datasets
> is also a streaming interface to read Arrow data from various underlying
> datasources, implementing things like projection pushdown and partitioning
> where possible. This is agnostic to whether the data is local or remote,
> i.e. there's no explicit concept of "remote dataset". It's all datasets
> whether it's in memory, on local disk, or across the network.
>
> So if/when a Flight datasource ("Fragment") is implemented for Arrow
> Datasets, this will be consumed in a streaming fashion, by a query engine
> which itself is streaming, which can be fed into a streaming interface like
> Flight. There's a good amount of work to do to ensure this all works well
> together (e.g. ensuring backpressure gets reflected across all these
> layers), but what you are asking for is in principle doable, if not quite
> yet implemented.
>
> -David
>
> On Tue, Apr 12, 2022, at 16:21, Adam Lippai wrote:
> > Hi James,
> >
> > Your answer helps, yes.
> > My question is whether I will be able to join two datasets (producing a
> new
> > dataset) in a streaming way or do I have to fetch the whole response and
> > keep it in memory?
> > So if my local node has memory constraints, will it be able to stream
> data
> > from an Apache Flight datasource and stream it back to a different Apache
> > Flight target?
> > If the answer is yes, is it because there will be a Remote Dataset
> concept
> > or will it use "distributed computing" using Substrait?
> >
> > Best regards,
> > Adam Lippai
> >
> > On Tue, Apr 12, 2022 at 4:14 PM James Duong <jamesd@bitquilltech.com
> .invalid>
> > wrote:
> >
> >> Hi Adam,
> >>
> >> Arrow Flight can be used to provide an RPC framework that returns
> datasets
> >> (sent over the wire as arrow buffers) and exposes them from a
> FlightClient
> >> as Arrow RecordBatches without serialization. Is this what you mean by
> >> remote datasets?
> >> Arrow Flight SQL is an application layer built on top of Arrow Flight
> that
> >> standardizes remote execution of SQL queries, getting catalog
> information,
> >> getting SQL capabilities, and other access-related concepts. Arrow
> Flight
> >> SQL is intended to provide a universal user-facing front end for
> existing
> >> SQL-capable database engines.
> >>
> >> Neither are really intended for computation, just remote access.
> >>
> >> On Tue, Apr 12, 2022 at 12:51 PM Adam Lippai <ad...@rigo.sk> wrote:
> >>
> >> > Hi,
> >> >
> >> > I saw really nice features like groupby and join developed recently.
> >> > I like how Dataset is supported for joins and how streamed processing
> is
> >> > gaining momentum in Arrow.
> >> >
> >> > Does Apache Arrow have the concept of remote datasets eg using Arrow
> >> > Flight? Or will this happen directly using S3 and other protocols
> only? I
> >> > know some work has started in Substrait, but that might be a whole new
> >> > level of integration, hence my question focusing on data first.
> >> >
> >> > I was trying to browse the JIRA issues, but the future picture wasn't
> >> clear
> >> > based on that
> >> >
> >> > Best regards,
> >> > Adam Lippai
> >> >
> >>
> >>
> >> --
> >>
> >> *James Duong*
> >> Lead Software Developer
> >> Bit Quill Technologies Inc.
> >> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> >> https://www.bitquilltech.com
> >>
> >> This email message is for the sole use of the intended recipient(s) and
> may
> >> contain confidential and privileged information.  Any unauthorized
> review,
> >> use, disclosure, or distribution is prohibited.  If you are not the
> >> intended recipient, please contact the sender by reply email and destroy
> >> all copies of the original message.  Thank you.
> >>
>

Re: Remote datasets

Posted by David Li <li...@apache.org>.
TL;DR yes, if and when all is said and done.

Breaking this down…

Substrait isn't really relevant here. It's a way to serialize a query in a way that's agnostic to whatever's actually generating or executing the query.

But if you have a Substrait plan, that can get converted by the Arrow C++ Query Engine into its internal "ExecPlan" for execution, which is what's actually implementing the joins, aggregations, etc. This engine operates in a streaming fashion, so your application can take the data you get out and use it with a Flight service/client, yes.

The query engine pulls input from the Arrow Datasets library. (Though while I speak of them separately, really, they are intertwined.) Datasets is also a streaming interface to read Arrow data from various underlying datasources, implementing things like projection pushdown and partitioning where possible. This is agnostic to whether the data is local or remote, i.e. there's no explicit concept of "remote dataset". It's all datasets whether it's in memory, on local disk, or across the network.

So if/when a Flight datasource ("Fragment") is implemented for Arrow Datasets, this will be consumed in a streaming fashion, by a query engine which itself is streaming, which can be fed into a streaming interface like Flight. There's a good amount of work to do to ensure this all works well together (e.g. ensuring backpressure gets reflected across all these layers), but what you are asking for is in principle doable, if not quite yet implemented.

-David

On Tue, Apr 12, 2022, at 16:21, Adam Lippai wrote:
> Hi James,
>
> Your answer helps, yes.
> My question is whether I will be able to join two datasets (producing a new
> dataset) in a streaming way or do I have to fetch the whole response and
> keep it in memory?
> So if my local node has memory constraints, will it be able to stream data
> from an Apache Flight datasource and stream it back to a different Apache
> Flight target?
> If the answer is yes, is it because there will be a Remote Dataset concept
> or will it use "distributed computing" using Substrait?
>
> Best regards,
> Adam Lippai
>
> On Tue, Apr 12, 2022 at 4:14 PM James Duong <ja...@bitquilltech.com.invalid>
> wrote:
>
>> Hi Adam,
>>
>> Arrow Flight can be used to provide an RPC framework that returns datasets
>> (sent over the wire as arrow buffers) and exposes them from a FlightClient
>> as Arrow RecordBatches without serialization. Is this what you mean by
>> remote datasets?
>> Arrow Flight SQL is an application layer built on top of Arrow Flight that
>> standardizes remote execution of SQL queries, getting catalog information,
>> getting SQL capabilities, and other access-related concepts. Arrow Flight
>> SQL is intended to provide a universal user-facing front end for existing
>> SQL-capable database engines.
>>
>> Neither are really intended for computation, just remote access.
>>
>> On Tue, Apr 12, 2022 at 12:51 PM Adam Lippai <ad...@rigo.sk> wrote:
>>
>> > Hi,
>> >
>> > I saw really nice features like groupby and join developed recently.
>> > I like how Dataset is supported for joins and how streamed processing is
>> > gaining momentum in Arrow.
>> >
>> > Does Apache Arrow have the concept of remote datasets eg using Arrow
>> > Flight? Or will this happen directly using S3 and other protocols only? I
>> > know some work has started in Substrait, but that might be a whole new
>> > level of integration, hence my question focusing on data first.
>> >
>> > I was trying to browse the JIRA issues, but the future picture wasn't
>> clear
>> > based on that
>> >
>> > Best regards,
>> > Adam Lippai
>> >
>>
>>
>> --
>>
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>>
>> This email message is for the sole use of the intended recipient(s) and may
>> contain confidential and privileged information.  Any unauthorized review,
>> use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>

Re: Remote datasets

Posted by Adam Lippai <ad...@rigo.sk>.
Hi James,

Your answer helps, yes.
My question is whether I will be able to join two datasets (producing a new
dataset) in a streaming way or do I have to fetch the whole response and
keep it in memory?
So if my local node has memory constraints, will it be able to stream data
from an Apache Flight datasource and stream it back to a different Apache
Flight target?
If the answer is yes, is it because there will be a Remote Dataset concept
or will it use "distributed computing" using Substrait?

Best regards,
Adam Lippai

On Tue, Apr 12, 2022 at 4:14 PM James Duong <ja...@bitquilltech.com.invalid>
wrote:

> Hi Adam,
>
> Arrow Flight can be used to provide an RPC framework that returns datasets
> (sent over the wire as arrow buffers) and exposes them from a FlightClient
> as Arrow RecordBatches without serialization. Is this what you mean by
> remote datasets?
> Arrow Flight SQL is an application layer built on top of Arrow Flight that
> standardizes remote execution of SQL queries, getting catalog information,
> getting SQL capabilities, and other access-related concepts. Arrow Flight
> SQL is intended to provide a universal user-facing front end for existing
> SQL-capable database engines.
>
> Neither are really intended for computation, just remote access.
>
> On Tue, Apr 12, 2022 at 12:51 PM Adam Lippai <ad...@rigo.sk> wrote:
>
> > Hi,
> >
> > I saw really nice features like groupby and join developed recently.
> > I like how Dataset is supported for joins and how streamed processing is
> > gaining momentum in Arrow.
> >
> > Does Apache Arrow have the concept of remote datasets eg using Arrow
> > Flight? Or will this happen directly using S3 and other protocols only? I
> > know some work has started in Substrait, but that might be a whole new
> > level of integration, hence my question focusing on data first.
> >
> > I was trying to browse the JIRA issues, but the future picture wasn't
> clear
> > based on that
> >
> > Best regards,
> > Adam Lippai
> >
>
>
> --
>
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
> This email message is for the sole use of the intended recipient(s) and may
> contain confidential and privileged information.  Any unauthorized review,
> use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>

Re: Remote datasets

Posted by James Duong <ja...@bitquilltech.com.INVALID>.
Hi Adam,

Arrow Flight can be used to provide an RPC framework that returns datasets
(sent over the wire as arrow buffers) and exposes them from a FlightClient
as Arrow RecordBatches without serialization. Is this what you mean by
remote datasets?
Arrow Flight SQL is an application layer built on top of Arrow Flight that
standardizes remote execution of SQL queries, getting catalog information,
getting SQL capabilities, and other access-related concepts. Arrow Flight
SQL is intended to provide a universal user-facing front end for existing
SQL-capable database engines.

Neither are really intended for computation, just remote access.

On Tue, Apr 12, 2022 at 12:51 PM Adam Lippai <ad...@rigo.sk> wrote:

> Hi,
>
> I saw really nice features like groupby and join developed recently.
> I like how Dataset is supported for joins and how streamed processing is
> gaining momentum in Arrow.
>
> Does Apache Arrow have the concept of remote datasets eg using Arrow
> Flight? Or will this happen directly using S3 and other protocols only? I
> know some work has started in Substrait, but that might be a whole new
> level of integration, hence my question focusing on data first.
>
> I was trying to browse the JIRA issues, but the future picture wasn't clear
> based on that
>
> Best regards,
> Adam Lippai
>


-- 

*James Duong*
Lead Software Developer
Bit Quill Technologies Inc.
Direct: +1.604.562.6082 | jamesd@bitquilltech.com
https://www.bitquilltech.com

This email message is for the sole use of the intended recipient(s) and may
contain confidential and privileged information.  Any unauthorized review,
use, disclosure, or distribution is prohibited.  If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.  Thank you.

Re: Remote datasets

Posted by David Li <li...@apache.org>.
Hey Adam, 

Good question, there are outstanding JIRAs to integrate Flight [1] and HTTP/FTP [2] into Datasets/Filesystems. There are also some JIRAs about various RDBMSes [3] that could also be viewed along a Datasets lens perhaps.

Note that this work all proceeds in layers, e.g. it's the C++ query engine implementing groupby/join. The work here would be to integrate things into either the C++ Datasets or Filesystems frameworks as appropriate (e.g., create client libraries for RDBMSes and integrate those into Datasets, or implement the appropriate Datasets interfaces to wrap Flight types) and those would then be picked up by the query engine. Anything in Substrait can proceed in parallel. 

[1]: https://issues.apache.org/jira/browse/ARROW-10524
[2]: https://issues.apache.org/jira/browse/ARROW-7594
[3]: https://issues.apache.org/jira/browse/ARROW-11670

-David

On Tue, Apr 12, 2022, at 15:51, Adam Lippai wrote:
> Hi,
>
> I saw really nice features like groupby and join developed recently.
> I like how Dataset is supported for joins and how streamed processing is
> gaining momentum in Arrow.
>
> Does Apache Arrow have the concept of remote datasets eg using Arrow
> Flight? Or will this happen directly using S3 and other protocols only? I
> know some work has started in Substrait, but that might be a whole new
> level of integration, hence my question focusing on data first.
>
> I was trying to browse the JIRA issues, but the future picture wasn't clear
> based on that
>
> Best regards,
> Adam Lippai