You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Li Jin <ic...@gmail.com> on 2022/08/31 21:09:47 UTC

Integration between Flight and Acero

Hello!

I have recently started to look into integrating Flight RPC with Acero
source/sink node.

In Flight, the life cycle of a "read" request looks sth like:

   - User specifies a URL (e.g. my_storage://my_path) and parameter (e.g.,
   begin = "20220101", end = "20220201")
   - Client issue GetFlightInfo and get FlightInfo from server
   - Client issue DoGet with the FlightInfo and get a stream reader
   - Client calls Nextuntil stream is exhausted

My question is, how does the above life cycle fit in an Acero node? In
other words, what are the proper places in Acero node lifecycle to issue
the corresponding flight RPC?

Appreciate any thoughts,
Li

Re: Integration between Flight and Acero

Posted by Li Jin <ic...@gmail.com>.
Thanks both for the suggestions, it makes sense.

I will try with SourceNode with the factory method first because my
service/client API doesn't support parallel read yet. (Parallel reading
while preserving data ordering via flight protocol is something I thought
about a little bit but probably something to solve later)

Li

On Tue, Sep 13, 2022 at 8:39 PM Weston Pace <we...@gmail.com> wrote:

> Yes.  If you need the source node to read in parallel OR if you have
> multiple fragments (especially if those fragments don't have identical
> schemas) then you want a dataset and not just a plain source node.
>
> On Tue, Sep 13, 2022 at 1:55 PM David Li <li...@apache.org> wrote:
> >
> > Yeah, I concur with Weston.
> >
> > > To start with I think a custom factory function will be sufficient
> > > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > > options would somehow describe the coordinates of the flight endpoint.
> >
> > These 'coordinates' would be a FlightDescriptor.
> >
> > > However, it might be nice if "open a connection to the flight
> > > endpoint" happened during the call to StartProducing and not during
> > > the factory function call.  This could maybe be a follow-up task.
> >
> > The factory would call GetFlightInfo (or maybe GetSchema, from what it
> sounds like) to get the schema, but this wouldn't actually read any data.
> StartProducing would then actually call DoGet to actually read data.
> >
> > ---
> >
> > The reason why I suggested adapting Flight to Dataset, assuming this
> matches the semantics of your service, is because it encapsulates these
> steps, but reuses all the machinery we already have:
> >
> > - Dataset discovery naturally becomes GetFlightInfo. (Semantically, this
> is like beginning execution of a query, and returns one or more partitions
> where the result set can be read.)
> > - Those partitions then each become a Fragment, and then they can be
> read in parallel by Dataset.
> >
> > It sounds like the service in question here isn't quite that complex,
> though, so no need to necessarily go that far.
> >
> > On Tue, Sep 13, 2022, at 19:18, Weston Pace wrote:
> > >> The alternative path of subclassing SourceNode and having
> ExecNode::Init or
> > >> ExecNode::StartProducing seems quite a bit of change (also I don't
> think
> > >> SourceNode is exposed via public header). But let me know if you
> think I am
> > >> missing something.
> > >
> > > Agreed that we don't want to go this route.  David's suggestion is a
> > > good idea.  However, this shouldn't be the responsibility of the
> > > caller exactly.
> > >
> > > In other words (and my lack of detailed knowledge about flight is
> > > probably going to leak here) there should still be a factory function
> > > (e.g. "flight_source" or something like that) and a custom options
> > > object (FlightSourceOptions).
> > >
> > > To start with I think a custom factory function will be sufficient
> > > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > > options would somehow describe the coordinates of the flight endpoint.
> > > The factory function would open a connection to the flight endpoint
> > > and convert this into a record batch reader.  Then it would create one
> > > of the node's that Yaron has contributed and return that.
> > >
> > > However, it might be nice if "open a connection to the flight
> > > endpoint" happened during the call to StartProducing and not during
> > > the factory function call.  This could maybe be a follow-up task.
> > > Perhaps source node could change so that, instead of accepting an
> > > AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
> > > it could execute that function during the call to StartProducing.
> > >
> > > On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ic...@gmail.com> wrote:
> > >>
> > >> Thanks Yaron for the pointer to that PR.
> > >>
> > >> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com>
> wrote:
> > >>
> > >> > If you can wrap the flight reader as a RecordBatchReader, then
> another
> > >> > possibility is using an upcoming PR (
> > >> > https://github.com/apache/arrow/pull/14041) that enables
> SourceNode to
> > >> > accept it. You would need to know the schema when configuring the
> > >> > SourceNode, but you won't need to derived from SourceNode.
> > >> >
> > >> >
> > >> > Yaron.
> > >> > ________________________________
> > >> > From: Li Jin <ic...@gmail.com>
> > >> > Sent: Tuesday, September 13, 2022 3:58 PM
> > >> > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > >> > Subject: Re: Integration between Flight and Acero
> > >> >
> > >> > Update:
> > >> >
> > >> > I am going to try what David Li suggested here:
> > >> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
> > >> >
> > >> > This seems to be the least amount of code. This does require calling
> > >> > "DoGet" at Acero plan/node creation time rather than execution time
> but I
> > >> > don't think it's a big deal for now.
> > >> >
> > >> > The alternative path of subclassing SourceNode and having
> ExecNode::Init or
> > >> > ExecNode::StartProducing seems quite a bit of change (also I don't
> think
> > >> > SourceNode is exposed via public header). But let me know if you
> think I am
> > >> > missing something.
> > >> >
> > >> > Li
> > >> >
> > >> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com>
> wrote:
> > >> >
> > >> > > Hi Li,
> > >> > >
> > >> > > Here's my 2 cents about the Ibis/Substrait part of this.
> > >> > >
> > >> > > An Ibis expression carries a schema. If you're planning to create
> an
> > >> > > integrated Ibis/Substrait/Arrow solution, then you'll need the
> schema to
> > >> > be
> > >> > > available to Ibis in Python. So, you'll need a Python wrapper for
> the C++
> > >> > > implementation you have in mind for the GetSchema method. I think
> you
> > >> > > should pass the schema obtained by (the wrapped) GetSchema to an
> Ibis
> > >> > node,
> > >> > > rather than defining a new Ibis node that would have to access the
> > >> > network
> > >> > > to get the schema on its own.
> > >> > >
> > >> > > Given the above, I agree with you that when the Acero node is
> created its
> > >> > > schema would already be known.
> > >> > >
> > >> > >
> > >> > > Yaron.
> > >> > > ________________________________
> > >> > > From: Li Jin <ic...@gmail.com>
> > >> > > Sent: Thursday, September 1, 2022 2:49 PM
> > >> > > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > >> > > Subject: Re: Integration between Flight and Acero
> > >> > >
> > >> > > Thanks David. I think my original question might not have been
> accurate
> > >> > so
> > >> > > I will try to rephrase my question:
> > >> > >
> > >> > > My ultimate goal is to add an ibis source node:
> > >> > >
> > >> > > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> > >> > >     url = ... # e.g. "my_storage://my_path"
> > >> > >     begin = ... # e.g. "20220101"
> > >> > >     end = ... # e.g. "20220201"
> > >> > >
> > >> > > and pass it to Acero and have Acero create a source node that
> knows how
> > >> > to
> > >> > > read from my_storage. Currently, I have a C++ class that looks
> like this
> > >> > > that knows how to read/write data:
> > >> > >
> > >> > > class MyStorageClient {
> > >> > >
> > >> > >     public:
> > >> > >
> > >> > >         /// \brief Construct a client
> > >> > >
> > >> > >         MyStorageClient(const std::string& service_location);
> > >> > >
> > >> > >
> > >> > >
> > >> > >         /// \brief Read data from a table streamingly
> > >> > >
> > >> > >         /// \param[in] table_uri
> > >> > >
> > >> > >         /// \param[in] start_time The start time (inclusive),
> e.g.,
> > >> > > '20100101'
> > >> > >
> > >> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > >> > '20100110'
> > >> > >
> > >> > >
>  arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> > >> > > ReadStream(const std::string& table_uri, const std::string&
> start_time,
> > >> > > const std::string& end_time);
> > >> > >
> > >> > >
> > >> > >
> > >> > >         /// \brief Write data to a table streamingly
> > >> > >
> > >> > >         /// This method will return a FlightStreamWriter that can
> be used
> > >> > > for streaming data into
> > >> > >
> > >> > >         /// \param[in] table_uri
> > >> > >
> > >> > >         /// \param[in] start_time The start time (inclusive),
> e.g.,
> > >> > > '20100101'
> > >> > >
> > >> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > >> > '20100110'
> > >> > >
> > >> > >         arrow::Result<DoPutResult> WriteStream(const std::string&
> > >> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> > >> > std::string
> > >> > > &start_time, const std::string &end_time);
> > >> > >
> > >> > >
> > >> > >
> > >> > >         /// \brief Get schema of a table.
> > >> > >
> > >> > >         /// \param[in] table The Smooth table name, e.g.,
> > >> > > smooth:/research/user/ljin/test
> > >> > >
> > >> > >         arrow::Result<std::shared_ptr<arrow::Schema>>
> GetSchema(const
> > >> > > std::string& table_uri);
> > >> > >     };
> > >> > >
> > >> > > I think Acero node's schema must be known when the node is
> created, I'd
> > >> > > imagine I would implement MyStorageExecNode that gets created by
> > >> > > SubstraitConsumer (via some registration mechanism in
> SubstraitConsumer):
> > >> > >
> > >> > > (1) GetSchema is called in SubstraitConsumer when creating the
> node
> > >> > > (network call to the storage backend to get schema)
> > >> > > (2) ReadStream is called in either ExecNode::Init or
> > >> > > ExecNode::StartProducing
> > >> > > to create the FlightStreamReader (3) Some thread (either the
> Plan's
> > >> > > execution thread or the thread owned by MyStorageExecNode) will
> read from
> > >> > > FlightStreamReader and send data downstream.
> > >> > >
> > >> > > Does that sound like the right approach or is there some other
> way I
> > >> > should
> > >> > > do this?
> > >> > >
> > >> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org>
> wrote:
> > >> > >
> > >> > > > Hi Li,
> > >> > > >
> > >> > > > It'd depend on how exactly you expect everything to fit
> together, and I
> > >> > > > think the way you'd go about it would depend on what exactly the
> > >> > > > application is. For instance, you could have the application
> code do
> > >> > > > everything up through DoGet and get a reader, then create a
> SourceNode
> > >> > > from
> > >> > > > the reader and continue from there.
> > >> > > >
> > >> > > > Otherwise, I would think the way to go would be to be able to
> create a
> > >> > > > node from a FlightDescriptor (which would contain the
> URL/parameters in
> > >> > > > your example). In that case, I think it'd fit into Arrow
> Dataset, under
> > >> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to
> dataset
> > >> > > > discovery, and each FlightEndpoint in the FlightInfo to a
> Fragment. As
> > >> > a
> > >> > > > bonus, there's already good integration between Dataset and
> Acero and
> > >> > > this
> > >> > > > should naturally do things like read the FlightEndpoints in
> parallel
> > >> > with
> > >> > > > readahead and so on.
> > >> > > >
> > >> > > > That means: you'd start with the FlightDescriptor, and create a
> Dataset
> > >> > > > from it. This will call GetFlightInfo under the hood. (There's
> a minor
> > >> > > > catch here: this assumes the service that returns the
> FlightInfo can
> > >> > > embed
> > >> > > > an accurate schema into it. If that's not true, there'll have
> to be
> > >> > some
> > >> > > > finagling with various ways of getting the actual schema,
> depending on
> > >> > > what
> > >> > > > exactly your service supports.) Once you have a Dataset, you
> can create
> > >> > > an
> > >> > > > ExecPlan and proceed like normal.
> > >> > > >
> > >> > > > Of course, if you then want to get things into Python, R,
> Substrait,
> > >> > > > etc... that requires some more work - especially for Substrait
> where
> > >> > I'm
> > >> > > > not sure how best to encode a custom source like that.
> > >> > > >
> > >> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> > >> > > >
> > >> > > > -David
> > >> > > >
> > >> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > >> > > > > Hello!
> > >> > > > >
> > >> > > > > I have recently started to look into integrating Flight RPC
> with
> > >> > Acero
> > >> > > > > source/sink node.
> > >> > > > >
> > >> > > > > In Flight, the life cycle of a "read" request looks sth like:
> > >> > > > >
> > >> > > > >    - User specifies a URL (e.g. my_storage://my_path) and
> parameter
> > >> > > > (e.g.,
> > >> > > > >    begin = "20220101", end = "20220201")
> > >> > > > >    - Client issue GetFlightInfo and get FlightInfo from server
> > >> > > > >    - Client issue DoGet with the FlightInfo and get a stream
> reader
> > >> > > > >    - Client calls Nextuntil stream is exhausted
> > >> > > > >
> > >> > > > > My question is, how does the above life cycle fit in an Acero
> node?
> > >> > In
> > >> > > > > other words, what are the proper places in Acero node
> lifecycle to
> > >> > > issue
> > >> > > > > the corresponding flight RPC?
> > >> > > > >
> > >> > > > > Appreciate any thoughts,
> > >> > > > > Li
> > >> > > >
> > >> > >
> > >> >
>

Re: Integration between Flight and Acero

Posted by Weston Pace <we...@gmail.com>.
Yes.  If you need the source node to read in parallel OR if you have
multiple fragments (especially if those fragments don't have identical
schemas) then you want a dataset and not just a plain source node.

On Tue, Sep 13, 2022 at 1:55 PM David Li <li...@apache.org> wrote:
>
> Yeah, I concur with Weston.
>
> > To start with I think a custom factory function will be sufficient
> > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > options would somehow describe the coordinates of the flight endpoint.
>
> These 'coordinates' would be a FlightDescriptor.
>
> > However, it might be nice if "open a connection to the flight
> > endpoint" happened during the call to StartProducing and not during
> > the factory function call.  This could maybe be a follow-up task.
>
> The factory would call GetFlightInfo (or maybe GetSchema, from what it sounds like) to get the schema, but this wouldn't actually read any data. StartProducing would then actually call DoGet to actually read data.
>
> ---
>
> The reason why I suggested adapting Flight to Dataset, assuming this matches the semantics of your service, is because it encapsulates these steps, but reuses all the machinery we already have:
>
> - Dataset discovery naturally becomes GetFlightInfo. (Semantically, this is like beginning execution of a query, and returns one or more partitions where the result set can be read.)
> - Those partitions then each become a Fragment, and then they can be read in parallel by Dataset.
>
> It sounds like the service in question here isn't quite that complex, though, so no need to necessarily go that far.
>
> On Tue, Sep 13, 2022, at 19:18, Weston Pace wrote:
> >> The alternative path of subclassing SourceNode and having ExecNode::Init or
> >> ExecNode::StartProducing seems quite a bit of change (also I don't think
> >> SourceNode is exposed via public header). But let me know if you think I am
> >> missing something.
> >
> > Agreed that we don't want to go this route.  David's suggestion is a
> > good idea.  However, this shouldn't be the responsibility of the
> > caller exactly.
> >
> > In other words (and my lack of detailed knowledge about flight is
> > probably going to leak here) there should still be a factory function
> > (e.g. "flight_source" or something like that) and a custom options
> > object (FlightSourceOptions).
> >
> > To start with I think a custom factory function will be sufficient
> > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > options would somehow describe the coordinates of the flight endpoint.
> > The factory function would open a connection to the flight endpoint
> > and convert this into a record batch reader.  Then it would create one
> > of the node's that Yaron has contributed and return that.
> >
> > However, it might be nice if "open a connection to the flight
> > endpoint" happened during the call to StartProducing and not during
> > the factory function call.  This could maybe be a follow-up task.
> > Perhaps source node could change so that, instead of accepting an
> > AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
> > it could execute that function during the call to StartProducing.
> >
> > On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ic...@gmail.com> wrote:
> >>
> >> Thanks Yaron for the pointer to that PR.
> >>
> >> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote:
> >>
> >> > If you can wrap the flight reader as a RecordBatchReader, then another
> >> > possibility is using an upcoming PR (
> >> > https://github.com/apache/arrow/pull/14041) that enables SourceNode to
> >> > accept it. You would need to know the schema when configuring the
> >> > SourceNode, but you won't need to derived from SourceNode.
> >> >
> >> >
> >> > Yaron.
> >> > ________________________________
> >> > From: Li Jin <ic...@gmail.com>
> >> > Sent: Tuesday, September 13, 2022 3:58 PM
> >> > To: dev@arrow.apache.org <de...@arrow.apache.org>
> >> > Subject: Re: Integration between Flight and Acero
> >> >
> >> > Update:
> >> >
> >> > I am going to try what David Li suggested here:
> >> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
> >> >
> >> > This seems to be the least amount of code. This does require calling
> >> > "DoGet" at Acero plan/node creation time rather than execution time but I
> >> > don't think it's a big deal for now.
> >> >
> >> > The alternative path of subclassing SourceNode and having ExecNode::Init or
> >> > ExecNode::StartProducing seems quite a bit of change (also I don't think
> >> > SourceNode is exposed via public header). But let me know if you think I am
> >> > missing something.
> >> >
> >> > Li
> >> >
> >> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >> >
> >> > > Hi Li,
> >> > >
> >> > > Here's my 2 cents about the Ibis/Substrait part of this.
> >> > >
> >> > > An Ibis expression carries a schema. If you're planning to create an
> >> > > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
> >> > be
> >> > > available to Ibis in Python. So, you'll need a Python wrapper for the C++
> >> > > implementation you have in mind for the GetSchema method. I think you
> >> > > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
> >> > node,
> >> > > rather than defining a new Ibis node that would have to access the
> >> > network
> >> > > to get the schema on its own.
> >> > >
> >> > > Given the above, I agree with you that when the Acero node is created its
> >> > > schema would already be known.
> >> > >
> >> > >
> >> > > Yaron.
> >> > > ________________________________
> >> > > From: Li Jin <ic...@gmail.com>
> >> > > Sent: Thursday, September 1, 2022 2:49 PM
> >> > > To: dev@arrow.apache.org <de...@arrow.apache.org>
> >> > > Subject: Re: Integration between Flight and Acero
> >> > >
> >> > > Thanks David. I think my original question might not have been accurate
> >> > so
> >> > > I will try to rephrase my question:
> >> > >
> >> > > My ultimate goal is to add an ibis source node:
> >> > >
> >> > > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> >> > >     url = ... # e.g. "my_storage://my_path"
> >> > >     begin = ... # e.g. "20220101"
> >> > >     end = ... # e.g. "20220201"
> >> > >
> >> > > and pass it to Acero and have Acero create a source node that knows how
> >> > to
> >> > > read from my_storage. Currently, I have a C++ class that looks like this
> >> > > that knows how to read/write data:
> >> > >
> >> > > class MyStorageClient {
> >> > >
> >> > >     public:
> >> > >
> >> > >         /// \brief Construct a client
> >> > >
> >> > >         MyStorageClient(const std::string& service_location);
> >> > >
> >> > >
> >> > >
> >> > >         /// \brief Read data from a table streamingly
> >> > >
> >> > >         /// \param[in] table_uri
> >> > >
> >> > >         /// \param[in] start_time The start time (inclusive), e.g.,
> >> > > '20100101'
> >> > >
> >> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> >> > '20100110'
> >> > >
> >> > >         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> >> > > ReadStream(const std::string& table_uri, const std::string& start_time,
> >> > > const std::string& end_time);
> >> > >
> >> > >
> >> > >
> >> > >         /// \brief Write data to a table streamingly
> >> > >
> >> > >         /// This method will return a FlightStreamWriter that can be used
> >> > > for streaming data into
> >> > >
> >> > >         /// \param[in] table_uri
> >> > >
> >> > >         /// \param[in] start_time The start time (inclusive), e.g.,
> >> > > '20100101'
> >> > >
> >> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> >> > '20100110'
> >> > >
> >> > >         arrow::Result<DoPutResult> WriteStream(const std::string&
> >> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> >> > std::string
> >> > > &start_time, const std::string &end_time);
> >> > >
> >> > >
> >> > >
> >> > >         /// \brief Get schema of a table.
> >> > >
> >> > >         /// \param[in] table The Smooth table name, e.g.,
> >> > > smooth:/research/user/ljin/test
> >> > >
> >> > >         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> >> > > std::string& table_uri);
> >> > >     };
> >> > >
> >> > > I think Acero node's schema must be known when the node is created, I'd
> >> > > imagine I would implement MyStorageExecNode that gets created by
> >> > > SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
> >> > >
> >> > > (1) GetSchema is called in SubstraitConsumer when creating the node
> >> > > (network call to the storage backend to get schema)
> >> > > (2) ReadStream is called in either ExecNode::Init or
> >> > > ExecNode::StartProducing
> >> > > to create the FlightStreamReader (3) Some thread (either the Plan's
> >> > > execution thread or the thread owned by MyStorageExecNode) will read from
> >> > > FlightStreamReader and send data downstream.
> >> > >
> >> > > Does that sound like the right approach or is there some other way I
> >> > should
> >> > > do this?
> >> > >
> >> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:
> >> > >
> >> > > > Hi Li,
> >> > > >
> >> > > > It'd depend on how exactly you expect everything to fit together, and I
> >> > > > think the way you'd go about it would depend on what exactly the
> >> > > > application is. For instance, you could have the application code do
> >> > > > everything up through DoGet and get a reader, then create a SourceNode
> >> > > from
> >> > > > the reader and continue from there.
> >> > > >
> >> > > > Otherwise, I would think the way to go would be to be able to create a
> >> > > > node from a FlightDescriptor (which would contain the URL/parameters in
> >> > > > your example). In that case, I think it'd fit into Arrow Dataset, under
> >> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> >> > > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As
> >> > a
> >> > > > bonus, there's already good integration between Dataset and Acero and
> >> > > this
> >> > > > should naturally do things like read the FlightEndpoints in parallel
> >> > with
> >> > > > readahead and so on.
> >> > > >
> >> > > > That means: you'd start with the FlightDescriptor, and create a Dataset
> >> > > > from it. This will call GetFlightInfo under the hood. (There's a minor
> >> > > > catch here: this assumes the service that returns the FlightInfo can
> >> > > embed
> >> > > > an accurate schema into it. If that's not true, there'll have to be
> >> > some
> >> > > > finagling with various ways of getting the actual schema, depending on
> >> > > what
> >> > > > exactly your service supports.) Once you have a Dataset, you can create
> >> > > an
> >> > > > ExecPlan and proceed like normal.
> >> > > >
> >> > > > Of course, if you then want to get things into Python, R, Substrait,
> >> > > > etc... that requires some more work - especially for Substrait where
> >> > I'm
> >> > > > not sure how best to encode a custom source like that.
> >> > > >
> >> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> >> > > >
> >> > > > -David
> >> > > >
> >> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> >> > > > > Hello!
> >> > > > >
> >> > > > > I have recently started to look into integrating Flight RPC with
> >> > Acero
> >> > > > > source/sink node.
> >> > > > >
> >> > > > > In Flight, the life cycle of a "read" request looks sth like:
> >> > > > >
> >> > > > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> >> > > > (e.g.,
> >> > > > >    begin = "20220101", end = "20220201")
> >> > > > >    - Client issue GetFlightInfo and get FlightInfo from server
> >> > > > >    - Client issue DoGet with the FlightInfo and get a stream reader
> >> > > > >    - Client calls Nextuntil stream is exhausted
> >> > > > >
> >> > > > > My question is, how does the above life cycle fit in an Acero node?
> >> > In
> >> > > > > other words, what are the proper places in Acero node lifecycle to
> >> > > issue
> >> > > > > the corresponding flight RPC?
> >> > > > >
> >> > > > > Appreciate any thoughts,
> >> > > > > Li
> >> > > >
> >> > >
> >> >

Re: Integration between Flight and Acero

Posted by David Li <li...@apache.org>.
Yeah, I concur with Weston. 

> To start with I think a custom factory function will be sufficient
> (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> options would somehow describe the coordinates of the flight endpoint.

These 'coordinates' would be a FlightDescriptor. 

> However, it might be nice if "open a connection to the flight
> endpoint" happened during the call to StartProducing and not during
> the factory function call.  This could maybe be a follow-up task.

The factory would call GetFlightInfo (or maybe GetSchema, from what it sounds like) to get the schema, but this wouldn't actually read any data. StartProducing would then actually call DoGet to actually read data.

---

The reason why I suggested adapting Flight to Dataset, assuming this matches the semantics of your service, is because it encapsulates these steps, but reuses all the machinery we already have:

- Dataset discovery naturally becomes GetFlightInfo. (Semantically, this is like beginning execution of a query, and returns one or more partitions where the result set can be read.)
- Those partitions then each become a Fragment, and then they can be read in parallel by Dataset.

It sounds like the service in question here isn't quite that complex, though, so no need to necessarily go that far.

On Tue, Sep 13, 2022, at 19:18, Weston Pace wrote:
>> The alternative path of subclassing SourceNode and having ExecNode::Init or
>> ExecNode::StartProducing seems quite a bit of change (also I don't think
>> SourceNode is exposed via public header). But let me know if you think I am
>> missing something.
>
> Agreed that we don't want to go this route.  David's suggestion is a
> good idea.  However, this shouldn't be the responsibility of the
> caller exactly.
>
> In other words (and my lack of detailed knowledge about flight is
> probably going to leak here) there should still be a factory function
> (e.g. "flight_source" or something like that) and a custom options
> object (FlightSourceOptions).
>
> To start with I think a custom factory function will be sufficient
> (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> options would somehow describe the coordinates of the flight endpoint.
> The factory function would open a connection to the flight endpoint
> and convert this into a record batch reader.  Then it would create one
> of the node's that Yaron has contributed and return that.
>
> However, it might be nice if "open a connection to the flight
> endpoint" happened during the call to StartProducing and not during
> the factory function call.  This could maybe be a follow-up task.
> Perhaps source node could change so that, instead of accepting an
> AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
> it could execute that function during the call to StartProducing.
>
> On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ic...@gmail.com> wrote:
>>
>> Thanks Yaron for the pointer to that PR.
>>
>> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote:
>>
>> > If you can wrap the flight reader as a RecordBatchReader, then another
>> > possibility is using an upcoming PR (
>> > https://github.com/apache/arrow/pull/14041) that enables SourceNode to
>> > accept it. You would need to know the schema when configuring the
>> > SourceNode, but you won't need to derived from SourceNode.
>> >
>> >
>> > Yaron.
>> > ________________________________
>> > From: Li Jin <ic...@gmail.com>
>> > Sent: Tuesday, September 13, 2022 3:58 PM
>> > To: dev@arrow.apache.org <de...@arrow.apache.org>
>> > Subject: Re: Integration between Flight and Acero
>> >
>> > Update:
>> >
>> > I am going to try what David Li suggested here:
>> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
>> >
>> > This seems to be the least amount of code. This does require calling
>> > "DoGet" at Acero plan/node creation time rather than execution time but I
>> > don't think it's a big deal for now.
>> >
>> > The alternative path of subclassing SourceNode and having ExecNode::Init or
>> > ExecNode::StartProducing seems quite a bit of change (also I don't think
>> > SourceNode is exposed via public header). But let me know if you think I am
>> > missing something.
>> >
>> > Li
>> >
>> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:
>> >
>> > > Hi Li,
>> > >
>> > > Here's my 2 cents about the Ibis/Substrait part of this.
>> > >
>> > > An Ibis expression carries a schema. If you're planning to create an
>> > > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
>> > be
>> > > available to Ibis in Python. So, you'll need a Python wrapper for the C++
>> > > implementation you have in mind for the GetSchema method. I think you
>> > > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
>> > node,
>> > > rather than defining a new Ibis node that would have to access the
>> > network
>> > > to get the schema on its own.
>> > >
>> > > Given the above, I agree with you that when the Acero node is created its
>> > > schema would already be known.
>> > >
>> > >
>> > > Yaron.
>> > > ________________________________
>> > > From: Li Jin <ic...@gmail.com>
>> > > Sent: Thursday, September 1, 2022 2:49 PM
>> > > To: dev@arrow.apache.org <de...@arrow.apache.org>
>> > > Subject: Re: Integration between Flight and Acero
>> > >
>> > > Thanks David. I think my original question might not have been accurate
>> > so
>> > > I will try to rephrase my question:
>> > >
>> > > My ultimate goal is to add an ibis source node:
>> > >
>> > > class MyStorageTable(ibis.TableNode, sch.HasSchema):
>> > >     url = ... # e.g. "my_storage://my_path"
>> > >     begin = ... # e.g. "20220101"
>> > >     end = ... # e.g. "20220201"
>> > >
>> > > and pass it to Acero and have Acero create a source node that knows how
>> > to
>> > > read from my_storage. Currently, I have a C++ class that looks like this
>> > > that knows how to read/write data:
>> > >
>> > > class MyStorageClient {
>> > >
>> > >     public:
>> > >
>> > >         /// \brief Construct a client
>> > >
>> > >         MyStorageClient(const std::string& service_location);
>> > >
>> > >
>> > >
>> > >         /// \brief Read data from a table streamingly
>> > >
>> > >         /// \param[in] table_uri
>> > >
>> > >         /// \param[in] start_time The start time (inclusive), e.g.,
>> > > '20100101'
>> > >
>> > >         /// \param[in] end_time The end time (exclusive), e.g.,
>> > '20100110'
>> > >
>> > >         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
>> > > ReadStream(const std::string& table_uri, const std::string& start_time,
>> > > const std::string& end_time);
>> > >
>> > >
>> > >
>> > >         /// \brief Write data to a table streamingly
>> > >
>> > >         /// This method will return a FlightStreamWriter that can be used
>> > > for streaming data into
>> > >
>> > >         /// \param[in] table_uri
>> > >
>> > >         /// \param[in] start_time The start time (inclusive), e.g.,
>> > > '20100101'
>> > >
>> > >         /// \param[in] end_time The end time (exclusive), e.g.,
>> > '20100110'
>> > >
>> > >         arrow::Result<DoPutResult> WriteStream(const std::string&
>> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
>> > std::string
>> > > &start_time, const std::string &end_time);
>> > >
>> > >
>> > >
>> > >         /// \brief Get schema of a table.
>> > >
>> > >         /// \param[in] table The Smooth table name, e.g.,
>> > > smooth:/research/user/ljin/test
>> > >
>> > >         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
>> > > std::string& table_uri);
>> > >     };
>> > >
>> > > I think Acero node's schema must be known when the node is created, I'd
>> > > imagine I would implement MyStorageExecNode that gets created by
>> > > SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
>> > >
>> > > (1) GetSchema is called in SubstraitConsumer when creating the node
>> > > (network call to the storage backend to get schema)
>> > > (2) ReadStream is called in either ExecNode::Init or
>> > > ExecNode::StartProducing
>> > > to create the FlightStreamReader (3) Some thread (either the Plan's
>> > > execution thread or the thread owned by MyStorageExecNode) will read from
>> > > FlightStreamReader and send data downstream.
>> > >
>> > > Does that sound like the right approach or is there some other way I
>> > should
>> > > do this?
>> > >
>> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:
>> > >
>> > > > Hi Li,
>> > > >
>> > > > It'd depend on how exactly you expect everything to fit together, and I
>> > > > think the way you'd go about it would depend on what exactly the
>> > > > application is. For instance, you could have the application code do
>> > > > everything up through DoGet and get a reader, then create a SourceNode
>> > > from
>> > > > the reader and continue from there.
>> > > >
>> > > > Otherwise, I would think the way to go would be to be able to create a
>> > > > node from a FlightDescriptor (which would contain the URL/parameters in
>> > > > your example). In that case, I think it'd fit into Arrow Dataset, under
>> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
>> > > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As
>> > a
>> > > > bonus, there's already good integration between Dataset and Acero and
>> > > this
>> > > > should naturally do things like read the FlightEndpoints in parallel
>> > with
>> > > > readahead and so on.
>> > > >
>> > > > That means: you'd start with the FlightDescriptor, and create a Dataset
>> > > > from it. This will call GetFlightInfo under the hood. (There's a minor
>> > > > catch here: this assumes the service that returns the FlightInfo can
>> > > embed
>> > > > an accurate schema into it. If that's not true, there'll have to be
>> > some
>> > > > finagling with various ways of getting the actual schema, depending on
>> > > what
>> > > > exactly your service supports.) Once you have a Dataset, you can create
>> > > an
>> > > > ExecPlan and proceed like normal.
>> > > >
>> > > > Of course, if you then want to get things into Python, R, Substrait,
>> > > > etc... that requires some more work - especially for Substrait where
>> > I'm
>> > > > not sure how best to encode a custom source like that.
>> > > >
>> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
>> > > >
>> > > > -David
>> > > >
>> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
>> > > > > Hello!
>> > > > >
>> > > > > I have recently started to look into integrating Flight RPC with
>> > Acero
>> > > > > source/sink node.
>> > > > >
>> > > > > In Flight, the life cycle of a "read" request looks sth like:
>> > > > >
>> > > > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
>> > > > (e.g.,
>> > > > >    begin = "20220101", end = "20220201")
>> > > > >    - Client issue GetFlightInfo and get FlightInfo from server
>> > > > >    - Client issue DoGet with the FlightInfo and get a stream reader
>> > > > >    - Client calls Nextuntil stream is exhausted
>> > > > >
>> > > > > My question is, how does the above life cycle fit in an Acero node?
>> > In
>> > > > > other words, what are the proper places in Acero node lifecycle to
>> > > issue
>> > > > > the corresponding flight RPC?
>> > > > >
>> > > > > Appreciate any thoughts,
>> > > > > Li
>> > > >
>> > >
>> >

Re: Integration between Flight and Acero

Posted by Weston Pace <we...@gmail.com>.
> The alternative path of subclassing SourceNode and having ExecNode::Init or
> ExecNode::StartProducing seems quite a bit of change (also I don't think
> SourceNode is exposed via public header). But let me know if you think I am
> missing something.

Agreed that we don't want to go this route.  David's suggestion is a
good idea.  However, this shouldn't be the responsibility of the
caller exactly.

In other words (and my lack of detailed knowledge about flight is
probably going to leak here) there should still be a factory function
(e.g. "flight_source" or something like that) and a custom options
object (FlightSourceOptions).

To start with I think a custom factory function will be sufficient
(e.g. look at MakeScanNode in scanner.cc for an example).  So the
options would somehow describe the coordinates of the flight endpoint.
The factory function would open a connection to the flight endpoint
and convert this into a record batch reader.  Then it would create one
of the node's that Yaron has contributed and return that.

However, it might be nice if "open a connection to the flight
endpoint" happened during the call to StartProducing and not during
the factory function call.  This could maybe be a follow-up task.
Perhaps source node could change so that, instead of accepting an
AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
it could execute that function during the call to StartProducing.

On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ic...@gmail.com> wrote:
>
> Thanks Yaron for the pointer to that PR.
>
> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > If you can wrap the flight reader as a RecordBatchReader, then another
> > possibility is using an upcoming PR (
> > https://github.com/apache/arrow/pull/14041) that enables SourceNode to
> > accept it. You would need to know the schema when configuring the
> > SourceNode, but you won't need to derived from SourceNode.
> >
> >
> > Yaron.
> > ________________________________
> > From: Li Jin <ic...@gmail.com>
> > Sent: Tuesday, September 13, 2022 3:58 PM
> > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > Subject: Re: Integration between Flight and Acero
> >
> > Update:
> >
> > I am going to try what David Li suggested here:
> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
> >
> > This seems to be the least amount of code. This does require calling
> > "DoGet" at Acero plan/node creation time rather than execution time but I
> > don't think it's a big deal for now.
> >
> > The alternative path of subclassing SourceNode and having ExecNode::Init or
> > ExecNode::StartProducing seems quite a bit of change (also I don't think
> > SourceNode is exposed via public header). But let me know if you think I am
> > missing something.
> >
> > Li
> >
> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Hi Li,
> > >
> > > Here's my 2 cents about the Ibis/Substrait part of this.
> > >
> > > An Ibis expression carries a schema. If you're planning to create an
> > > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
> > be
> > > available to Ibis in Python. So, you'll need a Python wrapper for the C++
> > > implementation you have in mind for the GetSchema method. I think you
> > > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
> > node,
> > > rather than defining a new Ibis node that would have to access the
> > network
> > > to get the schema on its own.
> > >
> > > Given the above, I agree with you that when the Acero node is created its
> > > schema would already be known.
> > >
> > >
> > > Yaron.
> > > ________________________________
> > > From: Li Jin <ic...@gmail.com>
> > > Sent: Thursday, September 1, 2022 2:49 PM
> > > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > > Subject: Re: Integration between Flight and Acero
> > >
> > > Thanks David. I think my original question might not have been accurate
> > so
> > > I will try to rephrase my question:
> > >
> > > My ultimate goal is to add an ibis source node:
> > >
> > > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> > >     url = ... # e.g. "my_storage://my_path"
> > >     begin = ... # e.g. "20220101"
> > >     end = ... # e.g. "20220201"
> > >
> > > and pass it to Acero and have Acero create a source node that knows how
> > to
> > > read from my_storage. Currently, I have a C++ class that looks like this
> > > that knows how to read/write data:
> > >
> > > class MyStorageClient {
> > >
> > >     public:
> > >
> > >         /// \brief Construct a client
> > >
> > >         MyStorageClient(const std::string& service_location);
> > >
> > >
> > >
> > >         /// \brief Read data from a table streamingly
> > >
> > >         /// \param[in] table_uri
> > >
> > >         /// \param[in] start_time The start time (inclusive), e.g.,
> > > '20100101'
> > >
> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > '20100110'
> > >
> > >         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> > > ReadStream(const std::string& table_uri, const std::string& start_time,
> > > const std::string& end_time);
> > >
> > >
> > >
> > >         /// \brief Write data to a table streamingly
> > >
> > >         /// This method will return a FlightStreamWriter that can be used
> > > for streaming data into
> > >
> > >         /// \param[in] table_uri
> > >
> > >         /// \param[in] start_time The start time (inclusive), e.g.,
> > > '20100101'
> > >
> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > '20100110'
> > >
> > >         arrow::Result<DoPutResult> WriteStream(const std::string&
> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> > std::string
> > > &start_time, const std::string &end_time);
> > >
> > >
> > >
> > >         /// \brief Get schema of a table.
> > >
> > >         /// \param[in] table The Smooth table name, e.g.,
> > > smooth:/research/user/ljin/test
> > >
> > >         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> > > std::string& table_uri);
> > >     };
> > >
> > > I think Acero node's schema must be known when the node is created, I'd
> > > imagine I would implement MyStorageExecNode that gets created by
> > > SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
> > >
> > > (1) GetSchema is called in SubstraitConsumer when creating the node
> > > (network call to the storage backend to get schema)
> > > (2) ReadStream is called in either ExecNode::Init or
> > > ExecNode::StartProducing
> > > to create the FlightStreamReader (3) Some thread (either the Plan's
> > > execution thread or the thread owned by MyStorageExecNode) will read from
> > > FlightStreamReader and send data downstream.
> > >
> > > Does that sound like the right approach or is there some other way I
> > should
> > > do this?
> > >
> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:
> > >
> > > > Hi Li,
> > > >
> > > > It'd depend on how exactly you expect everything to fit together, and I
> > > > think the way you'd go about it would depend on what exactly the
> > > > application is. For instance, you could have the application code do
> > > > everything up through DoGet and get a reader, then create a SourceNode
> > > from
> > > > the reader and continue from there.
> > > >
> > > > Otherwise, I would think the way to go would be to be able to create a
> > > > node from a FlightDescriptor (which would contain the URL/parameters in
> > > > your example). In that case, I think it'd fit into Arrow Dataset, under
> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> > > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As
> > a
> > > > bonus, there's already good integration between Dataset and Acero and
> > > this
> > > > should naturally do things like read the FlightEndpoints in parallel
> > with
> > > > readahead and so on.
> > > >
> > > > That means: you'd start with the FlightDescriptor, and create a Dataset
> > > > from it. This will call GetFlightInfo under the hood. (There's a minor
> > > > catch here: this assumes the service that returns the FlightInfo can
> > > embed
> > > > an accurate schema into it. If that's not true, there'll have to be
> > some
> > > > finagling with various ways of getting the actual schema, depending on
> > > what
> > > > exactly your service supports.) Once you have a Dataset, you can create
> > > an
> > > > ExecPlan and proceed like normal.
> > > >
> > > > Of course, if you then want to get things into Python, R, Substrait,
> > > > etc... that requires some more work - especially for Substrait where
> > I'm
> > > > not sure how best to encode a custom source like that.
> > > >
> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> > > >
> > > > -David
> > > >
> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > > > > Hello!
> > > > >
> > > > > I have recently started to look into integrating Flight RPC with
> > Acero
> > > > > source/sink node.
> > > > >
> > > > > In Flight, the life cycle of a "read" request looks sth like:
> > > > >
> > > > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> > > > (e.g.,
> > > > >    begin = "20220101", end = "20220201")
> > > > >    - Client issue GetFlightInfo and get FlightInfo from server
> > > > >    - Client issue DoGet with the FlightInfo and get a stream reader
> > > > >    - Client calls Nextuntil stream is exhausted
> > > > >
> > > > > My question is, how does the above life cycle fit in an Acero node?
> > In
> > > > > other words, what are the proper places in Acero node lifecycle to
> > > issue
> > > > > the corresponding flight RPC?
> > > > >
> > > > > Appreciate any thoughts,
> > > > > Li
> > > >
> > >
> >

Re: Integration between Flight and Acero

Posted by Li Jin <ic...@gmail.com>.
Thanks Yaron for the pointer to that PR.

On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote:

> If you can wrap the flight reader as a RecordBatchReader, then another
> possibility is using an upcoming PR (
> https://github.com/apache/arrow/pull/14041) that enables SourceNode to
> accept it. You would need to know the schema when configuring the
> SourceNode, but you won't need to derived from SourceNode.
>
>
> Yaron.
> ________________________________
> From: Li Jin <ic...@gmail.com>
> Sent: Tuesday, September 13, 2022 3:58 PM
> To: dev@arrow.apache.org <de...@arrow.apache.org>
> Subject: Re: Integration between Flight and Acero
>
> Update:
>
> I am going to try what David Li suggested here:
> https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
>
> This seems to be the least amount of code. This does require calling
> "DoGet" at Acero plan/node creation time rather than execution time but I
> don't think it's a big deal for now.
>
> The alternative path of subclassing SourceNode and having ExecNode::Init or
> ExecNode::StartProducing seems quite a bit of change (also I don't think
> SourceNode is exposed via public header). But let me know if you think I am
> missing something.
>
> Li
>
> On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > Hi Li,
> >
> > Here's my 2 cents about the Ibis/Substrait part of this.
> >
> > An Ibis expression carries a schema. If you're planning to create an
> > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
> be
> > available to Ibis in Python. So, you'll need a Python wrapper for the C++
> > implementation you have in mind for the GetSchema method. I think you
> > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
> node,
> > rather than defining a new Ibis node that would have to access the
> network
> > to get the schema on its own.
> >
> > Given the above, I agree with you that when the Acero node is created its
> > schema would already be known.
> >
> >
> > Yaron.
> > ________________________________
> > From: Li Jin <ic...@gmail.com>
> > Sent: Thursday, September 1, 2022 2:49 PM
> > To: dev@arrow.apache.org <de...@arrow.apache.org>
> > Subject: Re: Integration between Flight and Acero
> >
> > Thanks David. I think my original question might not have been accurate
> so
> > I will try to rephrase my question:
> >
> > My ultimate goal is to add an ibis source node:
> >
> > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> >     url = ... # e.g. "my_storage://my_path"
> >     begin = ... # e.g. "20220101"
> >     end = ... # e.g. "20220201"
> >
> > and pass it to Acero and have Acero create a source node that knows how
> to
> > read from my_storage. Currently, I have a C++ class that looks like this
> > that knows how to read/write data:
> >
> > class MyStorageClient {
> >
> >     public:
> >
> >         /// \brief Construct a client
> >
> >         MyStorageClient(const std::string& service_location);
> >
> >
> >
> >         /// \brief Read data from a table streamingly
> >
> >         /// \param[in] table_uri
> >
> >         /// \param[in] start_time The start time (inclusive), e.g.,
> > '20100101'
> >
> >         /// \param[in] end_time The end time (exclusive), e.g.,
> '20100110'
> >
> >         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> > ReadStream(const std::string& table_uri, const std::string& start_time,
> > const std::string& end_time);
> >
> >
> >
> >         /// \brief Write data to a table streamingly
> >
> >         /// This method will return a FlightStreamWriter that can be used
> > for streaming data into
> >
> >         /// \param[in] table_uri
> >
> >         /// \param[in] start_time The start time (inclusive), e.g.,
> > '20100101'
> >
> >         /// \param[in] end_time The end time (exclusive), e.g.,
> '20100110'
> >
> >         arrow::Result<DoPutResult> WriteStream(const std::string&
> > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> std::string
> > &start_time, const std::string &end_time);
> >
> >
> >
> >         /// \brief Get schema of a table.
> >
> >         /// \param[in] table The Smooth table name, e.g.,
> > smooth:/research/user/ljin/test
> >
> >         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> > std::string& table_uri);
> >     };
> >
> > I think Acero node's schema must be known when the node is created, I'd
> > imagine I would implement MyStorageExecNode that gets created by
> > SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
> >
> > (1) GetSchema is called in SubstraitConsumer when creating the node
> > (network call to the storage backend to get schema)
> > (2) ReadStream is called in either ExecNode::Init or
> > ExecNode::StartProducing
> > to create the FlightStreamReader (3) Some thread (either the Plan's
> > execution thread or the thread owned by MyStorageExecNode) will read from
> > FlightStreamReader and send data downstream.
> >
> > Does that sound like the right approach or is there some other way I
> should
> > do this?
> >
> > On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:
> >
> > > Hi Li,
> > >
> > > It'd depend on how exactly you expect everything to fit together, and I
> > > think the way you'd go about it would depend on what exactly the
> > > application is. For instance, you could have the application code do
> > > everything up through DoGet and get a reader, then create a SourceNode
> > from
> > > the reader and continue from there.
> > >
> > > Otherwise, I would think the way to go would be to be able to create a
> > > node from a FlightDescriptor (which would contain the URL/parameters in
> > > your example). In that case, I think it'd fit into Arrow Dataset, under
> > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As
> a
> > > bonus, there's already good integration between Dataset and Acero and
> > this
> > > should naturally do things like read the FlightEndpoints in parallel
> with
> > > readahead and so on.
> > >
> > > That means: you'd start with the FlightDescriptor, and create a Dataset
> > > from it. This will call GetFlightInfo under the hood. (There's a minor
> > > catch here: this assumes the service that returns the FlightInfo can
> > embed
> > > an accurate schema into it. If that's not true, there'll have to be
> some
> > > finagling with various ways of getting the actual schema, depending on
> > what
> > > exactly your service supports.) Once you have a Dataset, you can create
> > an
> > > ExecPlan and proceed like normal.
> > >
> > > Of course, if you then want to get things into Python, R, Substrait,
> > > etc... that requires some more work - especially for Substrait where
> I'm
> > > not sure how best to encode a custom source like that.
> > >
> > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> > >
> > > -David
> > >
> > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > > > Hello!
> > > >
> > > > I have recently started to look into integrating Flight RPC with
> Acero
> > > > source/sink node.
> > > >
> > > > In Flight, the life cycle of a "read" request looks sth like:
> > > >
> > > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> > > (e.g.,
> > > >    begin = "20220101", end = "20220201")
> > > >    - Client issue GetFlightInfo and get FlightInfo from server
> > > >    - Client issue DoGet with the FlightInfo and get a stream reader
> > > >    - Client calls Nextuntil stream is exhausted
> > > >
> > > > My question is, how does the above life cycle fit in an Acero node?
> In
> > > > other words, what are the proper places in Acero node lifecycle to
> > issue
> > > > the corresponding flight RPC?
> > > >
> > > > Appreciate any thoughts,
> > > > Li
> > >
> >
>

Re: Integration between Flight and Acero

Posted by Yaron Gvili <rt...@hotmail.com>.
If you can wrap the flight reader as a RecordBatchReader, then another possibility is using an upcoming PR (https://github.com/apache/arrow/pull/14041) that enables SourceNode to accept it. You would need to know the schema when configuring the SourceNode, but you won't need to derived from SourceNode.


Yaron.
________________________________
From: Li Jin <ic...@gmail.com>
Sent: Tuesday, September 13, 2022 3:58 PM
To: dev@arrow.apache.org <de...@arrow.apache.org>
Subject: Re: Integration between Flight and Acero

Update:

I am going to try what David Li suggested here:
https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v

This seems to be the least amount of code. This does require calling
"DoGet" at Acero plan/node creation time rather than execution time but I
don't think it's a big deal for now.

The alternative path of subclassing SourceNode and having ExecNode::Init or
ExecNode::StartProducing seems quite a bit of change (also I don't think
SourceNode is exposed via public header). But let me know if you think I am
missing something.

Li

On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:

> Hi Li,
>
> Here's my 2 cents about the Ibis/Substrait part of this.
>
> An Ibis expression carries a schema. If you're planning to create an
> integrated Ibis/Substrait/Arrow solution, then you'll need the schema to be
> available to Ibis in Python. So, you'll need a Python wrapper for the C++
> implementation you have in mind for the GetSchema method. I think you
> should pass the schema obtained by (the wrapped) GetSchema to an Ibis node,
> rather than defining a new Ibis node that would have to access the network
> to get the schema on its own.
>
> Given the above, I agree with you that when the Acero node is created its
> schema would already be known.
>
>
> Yaron.
> ________________________________
> From: Li Jin <ic...@gmail.com>
> Sent: Thursday, September 1, 2022 2:49 PM
> To: dev@arrow.apache.org <de...@arrow.apache.org>
> Subject: Re: Integration between Flight and Acero
>
> Thanks David. I think my original question might not have been accurate so
> I will try to rephrase my question:
>
> My ultimate goal is to add an ibis source node:
>
> class MyStorageTable(ibis.TableNode, sch.HasSchema):
>     url = ... # e.g. "my_storage://my_path"
>     begin = ... # e.g. "20220101"
>     end = ... # e.g. "20220201"
>
> and pass it to Acero and have Acero create a source node that knows how to
> read from my_storage. Currently, I have a C++ class that looks like this
> that knows how to read/write data:
>
> class MyStorageClient {
>
>     public:
>
>         /// \brief Construct a client
>
>         MyStorageClient(const std::string& service_location);
>
>
>
>         /// \brief Read data from a table streamingly
>
>         /// \param[in] table_uri
>
>         /// \param[in] start_time The start time (inclusive), e.g.,
> '20100101'
>
>         /// \param[in] end_time The end time (exclusive), e.g., '20100110'
>
>         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> ReadStream(const std::string& table_uri, const std::string& start_time,
> const std::string& end_time);
>
>
>
>         /// \brief Write data to a table streamingly
>
>         /// This method will return a FlightStreamWriter that can be used
> for streaming data into
>
>         /// \param[in] table_uri
>
>         /// \param[in] start_time The start time (inclusive), e.g.,
> '20100101'
>
>         /// \param[in] end_time The end time (exclusive), e.g., '20100110'
>
>         arrow::Result<DoPutResult> WriteStream(const std::string&
> table_uri, const std::shared_ptr<arrow::Schema> &schema, const std::string
> &start_time, const std::string &end_time);
>
>
>
>         /// \brief Get schema of a table.
>
>         /// \param[in] table The Smooth table name, e.g.,
> smooth:/research/user/ljin/test
>
>         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> std::string& table_uri);
>     };
>
> I think Acero node's schema must be known when the node is created, I'd
> imagine I would implement MyStorageExecNode that gets created by
> SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
>
> (1) GetSchema is called in SubstraitConsumer when creating the node
> (network call to the storage backend to get schema)
> (2) ReadStream is called in either ExecNode::Init or
> ExecNode::StartProducing
> to create the FlightStreamReader (3) Some thread (either the Plan's
> execution thread or the thread owned by MyStorageExecNode) will read from
> FlightStreamReader and send data downstream.
>
> Does that sound like the right approach or is there some other way I should
> do this?
>
> On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:
>
> > Hi Li,
> >
> > It'd depend on how exactly you expect everything to fit together, and I
> > think the way you'd go about it would depend on what exactly the
> > application is. For instance, you could have the application code do
> > everything up through DoGet and get a reader, then create a SourceNode
> from
> > the reader and continue from there.
> >
> > Otherwise, I would think the way to go would be to be able to create a
> > node from a FlightDescriptor (which would contain the URL/parameters in
> > your example). In that case, I think it'd fit into Arrow Dataset, under
> > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a
> > bonus, there's already good integration between Dataset and Acero and
> this
> > should naturally do things like read the FlightEndpoints in parallel with
> > readahead and so on.
> >
> > That means: you'd start with the FlightDescriptor, and create a Dataset
> > from it. This will call GetFlightInfo under the hood. (There's a minor
> > catch here: this assumes the service that returns the FlightInfo can
> embed
> > an accurate schema into it. If that's not true, there'll have to be some
> > finagling with various ways of getting the actual schema, depending on
> what
> > exactly your service supports.) Once you have a Dataset, you can create
> an
> > ExecPlan and proceed like normal.
> >
> > Of course, if you then want to get things into Python, R, Substrait,
> > etc... that requires some more work - especially for Substrait where I'm
> > not sure how best to encode a custom source like that.
> >
> > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> >
> > -David
> >
> > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > > Hello!
> > >
> > > I have recently started to look into integrating Flight RPC with Acero
> > > source/sink node.
> > >
> > > In Flight, the life cycle of a "read" request looks sth like:
> > >
> > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> > (e.g.,
> > >    begin = "20220101", end = "20220201")
> > >    - Client issue GetFlightInfo and get FlightInfo from server
> > >    - Client issue DoGet with the FlightInfo and get a stream reader
> > >    - Client calls Nextuntil stream is exhausted
> > >
> > > My question is, how does the above life cycle fit in an Acero node? In
> > > other words, what are the proper places in Acero node lifecycle to
> issue
> > > the corresponding flight RPC?
> > >
> > > Appreciate any thoughts,
> > > Li
> >
>

Re: Integration between Flight and Acero

Posted by Li Jin <ic...@gmail.com>.
Update:

I am going to try what David Li suggested here:
https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v

This seems to be the least amount of code. This does require calling
"DoGet" at Acero plan/node creation time rather than execution time but I
don't think it's a big deal for now.

The alternative path of subclassing SourceNode and having ExecNode::Init or
ExecNode::StartProducing seems quite a bit of change (also I don't think
SourceNode is exposed via public header). But let me know if you think I am
missing something.

Li

On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:

> Hi Li,
>
> Here's my 2 cents about the Ibis/Substrait part of this.
>
> An Ibis expression carries a schema. If you're planning to create an
> integrated Ibis/Substrait/Arrow solution, then you'll need the schema to be
> available to Ibis in Python. So, you'll need a Python wrapper for the C++
> implementation you have in mind for the GetSchema method. I think you
> should pass the schema obtained by (the wrapped) GetSchema to an Ibis node,
> rather than defining a new Ibis node that would have to access the network
> to get the schema on its own.
>
> Given the above, I agree with you that when the Acero node is created its
> schema would already be known.
>
>
> Yaron.
> ________________________________
> From: Li Jin <ic...@gmail.com>
> Sent: Thursday, September 1, 2022 2:49 PM
> To: dev@arrow.apache.org <de...@arrow.apache.org>
> Subject: Re: Integration between Flight and Acero
>
> Thanks David. I think my original question might not have been accurate so
> I will try to rephrase my question:
>
> My ultimate goal is to add an ibis source node:
>
> class MyStorageTable(ibis.TableNode, sch.HasSchema):
>     url = ... # e.g. "my_storage://my_path"
>     begin = ... # e.g. "20220101"
>     end = ... # e.g. "20220201"
>
> and pass it to Acero and have Acero create a source node that knows how to
> read from my_storage. Currently, I have a C++ class that looks like this
> that knows how to read/write data:
>
> class MyStorageClient {
>
>     public:
>
>         /// \brief Construct a client
>
>         MyStorageClient(const std::string& service_location);
>
>
>
>         /// \brief Read data from a table streamingly
>
>         /// \param[in] table_uri
>
>         /// \param[in] start_time The start time (inclusive), e.g.,
> '20100101'
>
>         /// \param[in] end_time The end time (exclusive), e.g., '20100110'
>
>         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> ReadStream(const std::string& table_uri, const std::string& start_time,
> const std::string& end_time);
>
>
>
>         /// \brief Write data to a table streamingly
>
>         /// This method will return a FlightStreamWriter that can be used
> for streaming data into
>
>         /// \param[in] table_uri
>
>         /// \param[in] start_time The start time (inclusive), e.g.,
> '20100101'
>
>         /// \param[in] end_time The end time (exclusive), e.g., '20100110'
>
>         arrow::Result<DoPutResult> WriteStream(const std::string&
> table_uri, const std::shared_ptr<arrow::Schema> &schema, const std::string
> &start_time, const std::string &end_time);
>
>
>
>         /// \brief Get schema of a table.
>
>         /// \param[in] table The Smooth table name, e.g.,
> smooth:/research/user/ljin/test
>
>         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> std::string& table_uri);
>     };
>
> I think Acero node's schema must be known when the node is created, I'd
> imagine I would implement MyStorageExecNode that gets created by
> SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
>
> (1) GetSchema is called in SubstraitConsumer when creating the node
> (network call to the storage backend to get schema)
> (2) ReadStream is called in either ExecNode::Init or
> ExecNode::StartProducing
> to create the FlightStreamReader (3) Some thread (either the Plan's
> execution thread or the thread owned by MyStorageExecNode) will read from
> FlightStreamReader and send data downstream.
>
> Does that sound like the right approach or is there some other way I should
> do this?
>
> On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:
>
> > Hi Li,
> >
> > It'd depend on how exactly you expect everything to fit together, and I
> > think the way you'd go about it would depend on what exactly the
> > application is. For instance, you could have the application code do
> > everything up through DoGet and get a reader, then create a SourceNode
> from
> > the reader and continue from there.
> >
> > Otherwise, I would think the way to go would be to be able to create a
> > node from a FlightDescriptor (which would contain the URL/parameters in
> > your example). In that case, I think it'd fit into Arrow Dataset, under
> > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a
> > bonus, there's already good integration between Dataset and Acero and
> this
> > should naturally do things like read the FlightEndpoints in parallel with
> > readahead and so on.
> >
> > That means: you'd start with the FlightDescriptor, and create a Dataset
> > from it. This will call GetFlightInfo under the hood. (There's a minor
> > catch here: this assumes the service that returns the FlightInfo can
> embed
> > an accurate schema into it. If that's not true, there'll have to be some
> > finagling with various ways of getting the actual schema, depending on
> what
> > exactly your service supports.) Once you have a Dataset, you can create
> an
> > ExecPlan and proceed like normal.
> >
> > Of course, if you then want to get things into Python, R, Substrait,
> > etc... that requires some more work - especially for Substrait where I'm
> > not sure how best to encode a custom source like that.
> >
> > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> >
> > -David
> >
> > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > > Hello!
> > >
> > > I have recently started to look into integrating Flight RPC with Acero
> > > source/sink node.
> > >
> > > In Flight, the life cycle of a "read" request looks sth like:
> > >
> > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> > (e.g.,
> > >    begin = "20220101", end = "20220201")
> > >    - Client issue GetFlightInfo and get FlightInfo from server
> > >    - Client issue DoGet with the FlightInfo and get a stream reader
> > >    - Client calls Nextuntil stream is exhausted
> > >
> > > My question is, how does the above life cycle fit in an Acero node? In
> > > other words, what are the proper places in Acero node lifecycle to
> issue
> > > the corresponding flight RPC?
> > >
> > > Appreciate any thoughts,
> > > Li
> >
>

Re: Integration between Flight and Acero

Posted by Yaron Gvili <rt...@hotmail.com>.
Hi Li,

Here's my 2 cents about the Ibis/Substrait part of this.

An Ibis expression carries a schema. If you're planning to create an integrated Ibis/Substrait/Arrow solution, then you'll need the schema to be available to Ibis in Python. So, you'll need a Python wrapper for the C++ implementation you have in mind for the GetSchema method. I think you should pass the schema obtained by (the wrapped) GetSchema to an Ibis node, rather than defining a new Ibis node that would have to access the network to get the schema on its own.

Given the above, I agree with you that when the Acero node is created its schema would already be known.


Yaron.
________________________________
From: Li Jin <ic...@gmail.com>
Sent: Thursday, September 1, 2022 2:49 PM
To: dev@arrow.apache.org <de...@arrow.apache.org>
Subject: Re: Integration between Flight and Acero

Thanks David. I think my original question might not have been accurate so
I will try to rephrase my question:

My ultimate goal is to add an ibis source node:

class MyStorageTable(ibis.TableNode, sch.HasSchema):
    url = ... # e.g. "my_storage://my_path"
    begin = ... # e.g. "20220101"
    end = ... # e.g. "20220201"

and pass it to Acero and have Acero create a source node that knows how to
read from my_storage. Currently, I have a C++ class that looks like this
that knows how to read/write data:

class MyStorageClient {

    public:

        /// \brief Construct a client

        MyStorageClient(const std::string& service_location);



        /// \brief Read data from a table streamingly

        /// \param[in] table_uri

        /// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

        /// \param[in] end_time The end time (exclusive), e.g., '20100110'

        arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
ReadStream(const std::string& table_uri, const std::string& start_time,
const std::string& end_time);



        /// \brief Write data to a table streamingly

        /// This method will return a FlightStreamWriter that can be used
for streaming data into

        /// \param[in] table_uri

        /// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

        /// \param[in] end_time The end time (exclusive), e.g., '20100110'

        arrow::Result<DoPutResult> WriteStream(const std::string&
table_uri, const std::shared_ptr<arrow::Schema> &schema, const std::string
&start_time, const std::string &end_time);



        /// \brief Get schema of a table.

        /// \param[in] table The Smooth table name, e.g.,
smooth:/research/user/ljin/test

        arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
std::string& table_uri);
    };

I think Acero node's schema must be known when the node is created, I'd
imagine I would implement MyStorageExecNode that gets created by
SubstraitConsumer (via some registration mechanism in SubstraitConsumer):

(1) GetSchema is called in SubstraitConsumer when creating the node
(network call to the storage backend to get schema)
(2) ReadStream is called in either ExecNode::Init or ExecNode::StartProducing
to create the FlightStreamReader (3) Some thread (either the Plan's
execution thread or the thread owned by MyStorageExecNode) will read from
FlightStreamReader and send data downstream.

Does that sound like the right approach or is there some other way I should
do this?

On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:

> Hi Li,
>
> It'd depend on how exactly you expect everything to fit together, and I
> think the way you'd go about it would depend on what exactly the
> application is. For instance, you could have the application code do
> everything up through DoGet and get a reader, then create a SourceNode from
> the reader and continue from there.
>
> Otherwise, I would think the way to go would be to be able to create a
> node from a FlightDescriptor (which would contain the URL/parameters in
> your example). In that case, I think it'd fit into Arrow Dataset, under
> ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a
> bonus, there's already good integration between Dataset and Acero and this
> should naturally do things like read the FlightEndpoints in parallel with
> readahead and so on.
>
> That means: you'd start with the FlightDescriptor, and create a Dataset
> from it. This will call GetFlightInfo under the hood. (There's a minor
> catch here: this assumes the service that returns the FlightInfo can embed
> an accurate schema into it. If that's not true, there'll have to be some
> finagling with various ways of getting the actual schema, depending on what
> exactly your service supports.) Once you have a Dataset, you can create an
> ExecPlan and proceed like normal.
>
> Of course, if you then want to get things into Python, R, Substrait,
> etc... that requires some more work - especially for Substrait where I'm
> not sure how best to encode a custom source like that.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-10524
>
> -David
>
> On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > Hello!
> >
> > I have recently started to look into integrating Flight RPC with Acero
> > source/sink node.
> >
> > In Flight, the life cycle of a "read" request looks sth like:
> >
> >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> (e.g.,
> >    begin = "20220101", end = "20220201")
> >    - Client issue GetFlightInfo and get FlightInfo from server
> >    - Client issue DoGet with the FlightInfo and get a stream reader
> >    - Client calls Nextuntil stream is exhausted
> >
> > My question is, how does the above life cycle fit in an Acero node? In
> > other words, what are the proper places in Acero node lifecycle to issue
> > the corresponding flight RPC?
> >
> > Appreciate any thoughts,
> > Li
>

Re: Integration between Flight and Acero

Posted by Li Jin <ic...@gmail.com>.
Thanks David. I think my original question might not have been accurate so
I will try to rephrase my question:

My ultimate goal is to add an ibis source node:

class MyStorageTable(ibis.TableNode, sch.HasSchema):
    url = ... # e.g. "my_storage://my_path"
    begin = ... # e.g. "20220101"
    end = ... # e.g. "20220201"

and pass it to Acero and have Acero create a source node that knows how to
read from my_storage. Currently, I have a C++ class that looks like this
that knows how to read/write data:

class MyStorageClient {

    public:

        /// \brief Construct a client

        MyStorageClient(const std::string& service_location);



        /// \brief Read data from a table streamingly

        /// \param[in] table_uri

        /// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

        /// \param[in] end_time The end time (exclusive), e.g., '20100110'

        arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
ReadStream(const std::string& table_uri, const std::string& start_time,
const std::string& end_time);



        /// \brief Write data to a table streamingly

        /// This method will return a FlightStreamWriter that can be used
for streaming data into

        /// \param[in] table_uri

        /// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

        /// \param[in] end_time The end time (exclusive), e.g., '20100110'

        arrow::Result<DoPutResult> WriteStream(const std::string&
table_uri, const std::shared_ptr<arrow::Schema> &schema, const std::string
&start_time, const std::string &end_time);



        /// \brief Get schema of a table.

        /// \param[in] table The Smooth table name, e.g.,
smooth:/research/user/ljin/test

        arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
std::string& table_uri);
    };

I think Acero node's schema must be known when the node is created, I'd
imagine I would implement MyStorageExecNode that gets created by
SubstraitConsumer (via some registration mechanism in SubstraitConsumer):

(1) GetSchema is called in SubstraitConsumer when creating the node
(network call to the storage backend to get schema)
(2) ReadStream is called in either ExecNode::Init or ExecNode::StartProducing
to create the FlightStreamReader (3) Some thread (either the Plan's
execution thread or the thread owned by MyStorageExecNode) will read from
FlightStreamReader and send data downstream.

Does that sound like the right approach or is there some other way I should
do this?

On Wed, Aug 31, 2022 at 6:16 PM David Li <li...@apache.org> wrote:

> Hi Li,
>
> It'd depend on how exactly you expect everything to fit together, and I
> think the way you'd go about it would depend on what exactly the
> application is. For instance, you could have the application code do
> everything up through DoGet and get a reader, then create a SourceNode from
> the reader and continue from there.
>
> Otherwise, I would think the way to go would be to be able to create a
> node from a FlightDescriptor (which would contain the URL/parameters in
> your example). In that case, I think it'd fit into Arrow Dataset, under
> ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a
> bonus, there's already good integration between Dataset and Acero and this
> should naturally do things like read the FlightEndpoints in parallel with
> readahead and so on.
>
> That means: you'd start with the FlightDescriptor, and create a Dataset
> from it. This will call GetFlightInfo under the hood. (There's a minor
> catch here: this assumes the service that returns the FlightInfo can embed
> an accurate schema into it. If that's not true, there'll have to be some
> finagling with various ways of getting the actual schema, depending on what
> exactly your service supports.) Once you have a Dataset, you can create an
> ExecPlan and proceed like normal.
>
> Of course, if you then want to get things into Python, R, Substrait,
> etc... that requires some more work - especially for Substrait where I'm
> not sure how best to encode a custom source like that.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-10524
>
> -David
>
> On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > Hello!
> >
> > I have recently started to look into integrating Flight RPC with Acero
> > source/sink node.
> >
> > In Flight, the life cycle of a "read" request looks sth like:
> >
> >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> (e.g.,
> >    begin = "20220101", end = "20220201")
> >    - Client issue GetFlightInfo and get FlightInfo from server
> >    - Client issue DoGet with the FlightInfo and get a stream reader
> >    - Client calls Nextuntil stream is exhausted
> >
> > My question is, how does the above life cycle fit in an Acero node? In
> > other words, what are the proper places in Acero node lifecycle to issue
> > the corresponding flight RPC?
> >
> > Appreciate any thoughts,
> > Li
>

Re: Integration between Flight and Acero

Posted by David Li <li...@apache.org>.
Hi Li,

It'd depend on how exactly you expect everything to fit together, and I think the way you'd go about it would depend on what exactly the application is. For instance, you could have the application code do everything up through DoGet and get a reader, then create a SourceNode from the reader and continue from there.

Otherwise, I would think the way to go would be to be able to create a node from a FlightDescriptor (which would contain the URL/parameters in your example). In that case, I think it'd fit into Arrow Dataset, under ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a bonus, there's already good integration between Dataset and Acero and this should naturally do things like read the FlightEndpoints in parallel with readahead and so on.

That means: you'd start with the FlightDescriptor, and create a Dataset from it. This will call GetFlightInfo under the hood. (There's a minor catch here: this assumes the service that returns the FlightInfo can embed an accurate schema into it. If that's not true, there'll have to be some finagling with various ways of getting the actual schema, depending on what exactly your service supports.) Once you have a Dataset, you can create an ExecPlan and proceed like normal.

Of course, if you then want to get things into Python, R, Substrait, etc... that requires some more work - especially for Substrait where I'm not sure how best to encode a custom source like that.

[1]: https://issues.apache.org/jira/browse/ARROW-10524

-David

On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> Hello!
>
> I have recently started to look into integrating Flight RPC with Acero
> source/sink node.
>
> In Flight, the life cycle of a "read" request looks sth like:
>
>    - User specifies a URL (e.g. my_storage://my_path) and parameter (e.g.,
>    begin = "20220101", end = "20220201")
>    - Client issue GetFlightInfo and get FlightInfo from server
>    - Client issue DoGet with the FlightInfo and get a stream reader
>    - Client calls Nextuntil stream is exhausted
>
> My question is, how does the above life cycle fit in an Acero node? In
> other words, what are the proper places in Acero node lifecycle to issue
> the corresponding flight RPC?
>
> Appreciate any thoughts,
> Li