You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Sutou Kouhei <ko...@clear-code.com> on 2023/04/26 00:55:28 UTC

[DISCUSS][Format][Flight] Ordered data support

Hi,

I would like to propose adding support for ordered data to
Apache Arrow Flight. If anyone has comments for this
proposal, please share them at here or the issue for this
proposal: https://github.com/apache/arrow/issues/34852

This is one of proposals in "[DISCUSS] Flight RPC/Flight
SQL/ADBC enhancements":

  https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp

See also the "Flight RPC: Ordered Data" section in the
design document for the proposals:

  https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#

Background:

Currently, the endpoints within a FlightInfo explicitly have
no ordering.

This is unnecessarily limiting. Systems can and do implement
distributed sorts, but they can't reflect this in the
current specification.

Proposal:

Add a flag to FlightInfo. If the flag is set, the client may
assume that the data is sorted in the same order as the
endpoints. Otherwise, the client cannot make any assumptions
(as before).

This is a compatible change because the client can just
ignore the flag.

Implementation:

https://github.com/apache/arrow/pull/35178 is an
implementation of this proposal. The pull requests has the
followings:

1. Format changes:
   https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
   * format/Flight.proto

2. Documentation changes:
   https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
   * docs/source/format/Flight.rst

3. The C++ implementation and an integration test:
   * cpp/src/arrow/flight/

4. The Java implementation and an integration test (thanks to David Li!):
   * java/flight/

5. The Go implementation and an integration test:
   * go/arrow/flight/
   * go/arrow/internal/flight_integration/

Next:

I'll start a vote for this proposal after we reach a consensus
on this proposal.

It's the standard process for format change.
See also:

* [VOTE] Formalize how to change format
  https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
* GH-35084: [Docs][Format] Add how to change format specification
  https://github.com/apache/arrow/pull/35174


Thanks,
-- 
kou

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by Weston Pace <we...@gmail.com>.
So this would be a case where multiple "endpoints" are acting as a single
"stream of batches"?  Or am I misunderstanding?

What're some scenarios where that would be done?  When would it be
preferred for the client to merge the endpoints instead of the client's
user?

On Thu, Apr 27, 2023, 3:22 PM David Li <li...@apache.org> wrote:

> The server would have to report these as multiple endpoints in all your
> examples. (There's nothing saying a particular location can only appear
> once, or that "Endpoint 2" has to come after "Endpoint 1" for the DESC
> example.)
>
> The flag tells the client if it can fetch data in parallel without regard
> to order or if it should make sure to preserve the sorting of the data.
> (The ADBC Flight SQL clients in Go, C++, etc. already had to deal with
> this.) For instance Acero may care because certain plan nodes require some
> sort of ordering to be present; knowing a Flight datasource has this
> ordering could then save having to insert a sort operation into the plan.
>
> "Implementation defined" I think would basically devolve to clients always
> making the conservative/inefficient choice, like the Go ADBC driver always
> preserving order out of concern for compatibility and Acero always sorting
> data to use order-dependent nodes.
>
> On Thu, Apr 27, 2023, at 23:55, Andrew Lamb wrote:
> > I wonder if we have considered simply removing the statement "There is no
> > ordering defined on endpoints. Hence, if the returned data has an
> ordering,
> > it should be returned in a single endpoint." and  replacing it with
> > something that says "the relative ordering of data from different
> endpoints
> > is implementation defined"
> >
> > I am struggling to come up with a concrete usecase for the "ordered"
> flag.
> >
> > The ticket references "distributed sort" but most distributed sort
> > algorithms I know of would produce multiple sorted streams that need to
> be
> > merged together. For example
> >
> > Endpoint 1: (B, C, D)
> > Endpoint 2: (A, E, F)
> >
> > It is not clear how the "ordered" flag would help here
> >
> > If the intent is somehow to signal the client it doesn't have to merge
> > (e.g. with data like)
> >
> > Endpoint 1: (A, B, C)
> > Endpoint 2:  (D, E, F)
> >
> > This seems of very limited value if, for example, if the user desired
> DESC
> > order, then the endpoint would return
> >
> > Endpoint 1: (C, B, A)
> > Endpoint 2: (F, E, D)
> >
> > Which doesn't seem to conform to the updated definition
> >
> > Andrew
> >
> >
> > On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com> wrote:
> >
> >> Hi,
> >>
> >> I would like to propose adding support for ordered data to
> >> Apache Arrow Flight. If anyone has comments for this
> >> proposal, please share them at here or the issue for this
> >> proposal: https://github.com/apache/arrow/issues/34852
> >>
> >> This is one of proposals in "[DISCUSS] Flight RPC/Flight
> >> SQL/ADBC enhancements":
> >>
> >>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
> >>
> >> See also the "Flight RPC: Ordered Data" section in the
> >> design document for the proposals:
> >>
> >>
> >>
> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
> >>
> >> Background:
> >>
> >> Currently, the endpoints within a FlightInfo explicitly have
> >> no ordering.
> >>
> >> This is unnecessarily limiting. Systems can and do implement
> >> distributed sorts, but they can't reflect this in the
> >> current specification.
> >>
> >> Proposal:
> >>
> >> Add a flag to FlightInfo. If the flag is set, the client may
> >> assume that the data is sorted in the same order as the
> >> endpoints. Otherwise, the client cannot make any assumptions
> >> (as before).
> >>
> >> This is a compatible change because the client can just
> >> ignore the flag.
> >>
> >> Implementation:
> >>
> >> https://github.com/apache/arrow/pull/35178 is an
> >> implementation of this proposal. The pull requests has the
> >> followings:
> >>
> >> 1. Format changes:
> >>
> >>
> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
> >>    * format/Flight.proto
> >>
> >> 2. Documentation changes:
> >>
> >>
> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
> >>    * docs/source/format/Flight.rst
> >>
> >> 3. The C++ implementation and an integration test:
> >>    * cpp/src/arrow/flight/
> >>
> >> 4. The Java implementation and an integration test (thanks to David
> Li!):
> >>    * java/flight/
> >>
> >> 5. The Go implementation and an integration test:
> >>    * go/arrow/flight/
> >>    * go/arrow/internal/flight_integration/
> >>
> >> Next:
> >>
> >> I'll start a vote for this proposal after we reach a consensus
> >> on this proposal.
> >>
> >> It's the standard process for format change.
> >> See also:
> >>
> >> * [VOTE] Formalize how to change format
> >>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
> >> * GH-35084: [Docs][Format] Add how to change format specification
> >>   https://github.com/apache/arrow/pull/35174
> >>
> >>
> >> Thanks,
> >> --
> >> kou
> >>
>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by David Li <li...@apache.org>.
The server would have to report these as multiple endpoints in all your examples. (There's nothing saying a particular location can only appear once, or that "Endpoint 2" has to come after "Endpoint 1" for the DESC example.)

The flag tells the client if it can fetch data in parallel without regard to order or if it should make sure to preserve the sorting of the data. (The ADBC Flight SQL clients in Go, C++, etc. already had to deal with this.) For instance Acero may care because certain plan nodes require some sort of ordering to be present; knowing a Flight datasource has this ordering could then save having to insert a sort operation into the plan.

"Implementation defined" I think would basically devolve to clients always making the conservative/inefficient choice, like the Go ADBC driver always preserving order out of concern for compatibility and Acero always sorting data to use order-dependent nodes.

On Thu, Apr 27, 2023, at 23:55, Andrew Lamb wrote:
> I wonder if we have considered simply removing the statement "There is no
> ordering defined on endpoints. Hence, if the returned data has an ordering,
> it should be returned in a single endpoint." and  replacing it with
> something that says "the relative ordering of data from different endpoints
> is implementation defined"
>
> I am struggling to come up with a concrete usecase for the "ordered" flag.
>
> The ticket references "distributed sort" but most distributed sort
> algorithms I know of would produce multiple sorted streams that need to be
> merged together. For example
>
> Endpoint 1: (B, C, D)
> Endpoint 2: (A, E, F)
>
> It is not clear how the "ordered" flag would help here
>
> If the intent is somehow to signal the client it doesn't have to merge
> (e.g. with data like)
>
> Endpoint 1: (A, B, C)
> Endpoint 2:  (D, E, F)
>
> This seems of very limited value if, for example, if the user desired DESC
> order, then the endpoint would return
>
> Endpoint 1: (C, B, A)
> Endpoint 2: (F, E, D)
>
> Which doesn't seem to conform to the updated definition
>
> Andrew
>
>
> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com> wrote:
>
>> Hi,
>>
>> I would like to propose adding support for ordered data to
>> Apache Arrow Flight. If anyone has comments for this
>> proposal, please share them at here or the issue for this
>> proposal: https://github.com/apache/arrow/issues/34852
>>
>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
>> SQL/ADBC enhancements":
>>
>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>>
>> See also the "Flight RPC: Ordered Data" section in the
>> design document for the proposals:
>>
>>
>> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>>
>> Background:
>>
>> Currently, the endpoints within a FlightInfo explicitly have
>> no ordering.
>>
>> This is unnecessarily limiting. Systems can and do implement
>> distributed sorts, but they can't reflect this in the
>> current specification.
>>
>> Proposal:
>>
>> Add a flag to FlightInfo. If the flag is set, the client may
>> assume that the data is sorted in the same order as the
>> endpoints. Otherwise, the client cannot make any assumptions
>> (as before).
>>
>> This is a compatible change because the client can just
>> ignore the flag.
>>
>> Implementation:
>>
>> https://github.com/apache/arrow/pull/35178 is an
>> implementation of this proposal. The pull requests has the
>> followings:
>>
>> 1. Format changes:
>>
>> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>>    * format/Flight.proto
>>
>> 2. Documentation changes:
>>
>> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>>    * docs/source/format/Flight.rst
>>
>> 3. The C++ implementation and an integration test:
>>    * cpp/src/arrow/flight/
>>
>> 4. The Java implementation and an integration test (thanks to David Li!):
>>    * java/flight/
>>
>> 5. The Go implementation and an integration test:
>>    * go/arrow/flight/
>>    * go/arrow/internal/flight_integration/
>>
>> Next:
>>
>> I'll start a vote for this proposal after we reach a consensus
>> on this proposal.
>>
>> It's the standard process for format change.
>> See also:
>>
>> * [VOTE] Formalize how to change format
>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
>> * GH-35084: [Docs][Format] Add how to change format specification
>>   https://github.com/apache/arrow/pull/35174
>>
>>
>> Thanks,
>> --
>> kou
>>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi Andrew,

Thanks for your wording suggestions!
I've merged it.

Here is the latest document:
  http://crossbow.voltrondata.com/pr_docs/35178/format/Flight.html#downloading-data

If there is no more comment in a few days, I'll start voting
on this proposal.


Thanks,
-- 
kou

In <CA...@mail.gmail.com>
  "Re: [DISCUSS][Format][Flight] Ordered data support" on Fri, 28 Apr 2023 08:35:34 -0400,
  Andrew Lamb <al...@influxdata.com> wrote:

> Thank you for the clarification.
> 
> The point I was missing was that this flag is instructing the FlightClient
> how to present the results to the client application, rather than specific
> properties of the underlying stream.
> 
> I can see the value of returning result streams in a specific order
> (by endpoint) or also being able to retrieve the streams from the endpoints
> in any order (and potentially interleave the results from the endpoint as
> they arrive)
> 
> I left some suggestions on clarifying the wording on [1] that might help
> avoid figure confusion
> 
> Andrew
> 
> [1] https://github.com/apache/arrow/pull/35178
> 
> On Fri, Apr 28, 2023 at 1:02 AM David Li <li...@apache.org> wrote:
> 
>> For a lot of partitions - you could have a small number of threads
>> consuming a queue of partitions (and deciding whether you need to
>> sequence/renumber their outputs or not), much like what Acero does with a
>> FileSystemDataset.
>>
>> Note that the Flight client itself doesn't do any of this (perhaps it
>> should!); it's clients of Flight that have to deal with this. (...that's a
>> bit confusing)
>>
>> On Fri, Apr 28, 2023, at 19:06, Weston Pace wrote:
>> > Thank you both for the extra information.  Acero couldn't actually merge
>> > the streams today, I was thinking more of datafusion and velox which
>> would
>> > often want to keep the streams separate, especially if there was some
>> kind
>> > of filtering or transformation that could be applied before applying a
>> > sorted merge.
>> >
>> > However, I also very much agree that both scenarios are valid.  First, if
>> > there are a lot of partitions (e.g. far more than the # of parallelism
>> > units) then you probably don't want parallel paths for all of them.
>> >
>> > Second, as you said, simpler clients (e.g. those where all filtering is
>> > down downstream, or those that don't need any filtering at all) will
>> > appreciate flight's ability to merge for them.  It makes the client more
>> > complex but given that clients are already doing this to some extent it
>> > seems worthwhile.
>> >
>> > On Thu, Apr 27, 2023 at 7:45 PM David Li <li...@apache.org> wrote:
>> >
>> >> In addition to Kou's response:
>> >>
>> >> The individual endpoints have always represented a subset of a single
>> >> stream of data. So each endpoint in a FlightInfo is a partition of the
>> >> overall result set.
>> >>
>> >> Not all clients want to deal with reading all the Flight streams
>> >> themselves and may want a single stream of data. (For example: ADBC
>> exposes
>> >> both paths. The JDBC driver also has to deal with this.) So some client
>> >> libraries have to deal with the question of whether to read in parallel
>> and
>> >> whether to keep the result in order or not. A more advanced use case,
>> like
>> >> Acero, would probably read the endpoints itself and could use this flag
>> to
>> >> decide how to merge the streams.
>> >>
>> >> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
>> >> > Hi,
>> >> >
>> >> >> This seems of very limited value if, for example, if the user desired
>> >> DESC
>> >> >> order, then the endpoint would return
>> >> >>
>> >> >> Endpoint 1: (C, B, A)
>> >> >> Endpoint 2: (F, E, D)
>> >> >
>> >> > As David said, the server returns
>> >> >
>> >> > Endpoint 2: (F, E, D)
>> >> > Endpoint 1: (C, B, A)
>> >> >
>> >> > in this case.
>> >> >
>> >> > Here is an use case I think:
>> >> >
>> >> > A system has time series data. Each node in the system has
>> >> > data for one day. If a client requests "SELECT * FROM data
>> >> > WHERE server = 'server1' ORDER BY created_at DESC", the
>> >> > system returns the followings:
>> >> >
>> >> > Endpoint 20230428: (DATA_FOR_2023_04_28)
>> >> > Endpoint 20230427: (DATA_FOR_2023_04_27)
>> >> > Endpoint 20230426: (DATA_FOR_2023_04_26)
>> >> > ...
>> >> >
>> >> > If we have the "ordered" flag, the client can assume that
>> >> > received data are sorted. In other words, if the client
>> >> > reads data from Endpoint 20230428 -> Endpoint 20230427 ->
>> >> > Endpoint 20230426, the data the client read is sorted.
>> >> >
>> >> > If we don't have the "ordered" flag and we use "the relative
>> >> > ordering of data from different endpoints is implementation
>> >> > defined", we can't implement a general purpose Flight based
>> >> > client library (Flight SQL based client library, Flight SQL
>> >> > based ADBC driver and so on). The client library will have
>> >> > the following code:
>> >> >
>> >> >   # TODO: How to detect server_type?
>> >> >   if server_type == "DB1"
>> >> >     # DB1 returns ordered result.
>> >> >     endpoints.each do |endpoint|
>> >> >       yield(endpoints.read)
>> >> >     end
>> >> >   else
>> >> >     # Other DBs doesn't return ordered result.
>> >> >     # So, we read data in parallel for performance.
>> >> >     threads = endpoints.collect do |endpoint|
>> >> >       Thread.new do
>> >> >         yield(endpoints.read)
>> >> >       end
>> >> >     end
>> >> >     threads.each do |thread|
>> >> >       thread.join
>> >> >     end
>> >> >   end
>> >> >
>> >> > The client library needs to add 'or server_type == "DB2"' to
>> >> > 'if server_type == "DB1"' when DB2 also adds support for
>> >> > ordered result. If DB2 2.0 or later is only ordered result
>> >> > ready, the client library needs more condition 'or
>> >> > (server_type == "DB2" and server_version > 2.0)'.
>> >> >
>> >> > So I think that the "ordered" flag is useful.
>> >> >
>> >> >
>> >> > Thanks,
>> >> > --
>> >> > kou
>> >> >
>> >> > In <CAFhtnRxzMaoqmzWPkqsLoJZW5jmx=
>> d_i9ojd9Xy1ydkgkGzVKw@mail.gmail.com>
>> >> >   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr
>> >> > 2023 10:55:32 -0400,
>> >> >   Andrew Lamb <al...@influxdata.com> wrote:
>> >> >
>> >> >> I wonder if we have considered simply removing the statement "There
>> is
>> >> no
>> >> >> ordering defined on endpoints. Hence, if the returned data has an
>> >> ordering,
>> >> >> it should be returned in a single endpoint." and  replacing it with
>> >> >> something that says "the relative ordering of data from different
>> >> endpoints
>> >> >> is implementation defined"
>> >> >>
>> >> >> I am struggling to come up with a concrete usecase for the "ordered"
>> >> flag.
>> >> >>
>> >> >> The ticket references "distributed sort" but most distributed sort
>> >> >> algorithms I know of would produce multiple sorted streams that need
>> to
>> >> be
>> >> >> merged together. For example
>> >> >>
>> >> >> Endpoint 1: (B, C, D)
>> >> >> Endpoint 2: (A, E, F)
>> >> >>
>> >> >> It is not clear how the "ordered" flag would help here
>> >> >>
>> >> >> If the intent is somehow to signal the client it doesn't have to
>> merge
>> >> >> (e.g. with data like)
>> >> >>
>> >> >> Endpoint 1: (A, B, C)
>> >> >> Endpoint 2:  (D, E, F)
>> >> >>
>> >> >> This seems of very limited value if, for example, if the user desired
>> >> DESC
>> >> >> order, then the endpoint would return
>> >> >>
>> >> >> Endpoint 1: (C, B, A)
>> >> >> Endpoint 2: (F, E, D)
>> >> >>
>> >> >> Which doesn't seem to conform to the updated definition
>> >> >>
>> >> >> Andrew
>> >> >>
>> >> >>
>> >> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com>
>> >> wrote:
>> >> >>
>> >> >>> Hi,
>> >> >>>
>> >> >>> I would like to propose adding support for ordered data to
>> >> >>> Apache Arrow Flight. If anyone has comments for this
>> >> >>> proposal, please share them at here or the issue for this
>> >> >>> proposal: https://github.com/apache/arrow/issues/34852
>> >> >>>
>> >> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
>> >> >>> SQL/ADBC enhancements":
>> >> >>>
>> >> >>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>> >> >>>
>> >> >>> See also the "Flight RPC: Ordered Data" section in the
>> >> >>> design document for the proposals:
>> >> >>>
>> >> >>>
>> >> >>>
>> >>
>> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>> >> >>>
>> >> >>> Background:
>> >> >>>
>> >> >>> Currently, the endpoints within a FlightInfo explicitly have
>> >> >>> no ordering.
>> >> >>>
>> >> >>> This is unnecessarily limiting. Systems can and do implement
>> >> >>> distributed sorts, but they can't reflect this in the
>> >> >>> current specification.
>> >> >>>
>> >> >>> Proposal:
>> >> >>>
>> >> >>> Add a flag to FlightInfo. If the flag is set, the client may
>> >> >>> assume that the data is sorted in the same order as the
>> >> >>> endpoints. Otherwise, the client cannot make any assumptions
>> >> >>> (as before).
>> >> >>>
>> >> >>> This is a compatible change because the client can just
>> >> >>> ignore the flag.
>> >> >>>
>> >> >>> Implementation:
>> >> >>>
>> >> >>> https://github.com/apache/arrow/pull/35178 is an
>> >> >>> implementation of this proposal. The pull requests has the
>> >> >>> followings:
>> >> >>>
>> >> >>> 1. Format changes:
>> >> >>>
>> >> >>>
>> >>
>> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>> >> >>>    * format/Flight.proto
>> >> >>>
>> >> >>> 2. Documentation changes:
>> >> >>>
>> >> >>>
>> >>
>> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>> >> >>>    * docs/source/format/Flight.rst
>> >> >>>
>> >> >>> 3. The C++ implementation and an integration test:
>> >> >>>    * cpp/src/arrow/flight/
>> >> >>>
>> >> >>> 4. The Java implementation and an integration test (thanks to David
>> >> Li!):
>> >> >>>    * java/flight/
>> >> >>>
>> >> >>> 5. The Go implementation and an integration test:
>> >> >>>    * go/arrow/flight/
>> >> >>>    * go/arrow/internal/flight_integration/
>> >> >>>
>> >> >>> Next:
>> >> >>>
>> >> >>> I'll start a vote for this proposal after we reach a consensus
>> >> >>> on this proposal.
>> >> >>>
>> >> >>> It's the standard process for format change.
>> >> >>> See also:
>> >> >>>
>> >> >>> * [VOTE] Formalize how to change format
>> >> >>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
>> >> >>> * GH-35084: [Docs][Format] Add how to change format specification
>> >> >>>   https://github.com/apache/arrow/pull/35174
>> >> >>>
>> >> >>>
>> >> >>> Thanks,
>> >> >>> --
>> >> >>> kou
>> >> >>>
>> >>
>>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by Andrew Lamb <al...@influxdata.com>.
Thank you for the clarification.

The point I was missing was that this flag is instructing the FlightClient
how to present the results to the client application, rather than specific
properties of the underlying stream.

I can see the value of returning result streams in a specific order
(by endpoint) or also being able to retrieve the streams from the endpoints
in any order (and potentially interleave the results from the endpoint as
they arrive)

I left some suggestions on clarifying the wording on [1] that might help
avoid figure confusion

Andrew

[1] https://github.com/apache/arrow/pull/35178

On Fri, Apr 28, 2023 at 1:02 AM David Li <li...@apache.org> wrote:

> For a lot of partitions - you could have a small number of threads
> consuming a queue of partitions (and deciding whether you need to
> sequence/renumber their outputs or not), much like what Acero does with a
> FileSystemDataset.
>
> Note that the Flight client itself doesn't do any of this (perhaps it
> should!); it's clients of Flight that have to deal with this. (...that's a
> bit confusing)
>
> On Fri, Apr 28, 2023, at 19:06, Weston Pace wrote:
> > Thank you both for the extra information.  Acero couldn't actually merge
> > the streams today, I was thinking more of datafusion and velox which
> would
> > often want to keep the streams separate, especially if there was some
> kind
> > of filtering or transformation that could be applied before applying a
> > sorted merge.
> >
> > However, I also very much agree that both scenarios are valid.  First, if
> > there are a lot of partitions (e.g. far more than the # of parallelism
> > units) then you probably don't want parallel paths for all of them.
> >
> > Second, as you said, simpler clients (e.g. those where all filtering is
> > down downstream, or those that don't need any filtering at all) will
> > appreciate flight's ability to merge for them.  It makes the client more
> > complex but given that clients are already doing this to some extent it
> > seems worthwhile.
> >
> > On Thu, Apr 27, 2023 at 7:45 PM David Li <li...@apache.org> wrote:
> >
> >> In addition to Kou's response:
> >>
> >> The individual endpoints have always represented a subset of a single
> >> stream of data. So each endpoint in a FlightInfo is a partition of the
> >> overall result set.
> >>
> >> Not all clients want to deal with reading all the Flight streams
> >> themselves and may want a single stream of data. (For example: ADBC
> exposes
> >> both paths. The JDBC driver also has to deal with this.) So some client
> >> libraries have to deal with the question of whether to read in parallel
> and
> >> whether to keep the result in order or not. A more advanced use case,
> like
> >> Acero, would probably read the endpoints itself and could use this flag
> to
> >> decide how to merge the streams.
> >>
> >> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
> >> > Hi,
> >> >
> >> >> This seems of very limited value if, for example, if the user desired
> >> DESC
> >> >> order, then the endpoint would return
> >> >>
> >> >> Endpoint 1: (C, B, A)
> >> >> Endpoint 2: (F, E, D)
> >> >
> >> > As David said, the server returns
> >> >
> >> > Endpoint 2: (F, E, D)
> >> > Endpoint 1: (C, B, A)
> >> >
> >> > in this case.
> >> >
> >> > Here is an use case I think:
> >> >
> >> > A system has time series data. Each node in the system has
> >> > data for one day. If a client requests "SELECT * FROM data
> >> > WHERE server = 'server1' ORDER BY created_at DESC", the
> >> > system returns the followings:
> >> >
> >> > Endpoint 20230428: (DATA_FOR_2023_04_28)
> >> > Endpoint 20230427: (DATA_FOR_2023_04_27)
> >> > Endpoint 20230426: (DATA_FOR_2023_04_26)
> >> > ...
> >> >
> >> > If we have the "ordered" flag, the client can assume that
> >> > received data are sorted. In other words, if the client
> >> > reads data from Endpoint 20230428 -> Endpoint 20230427 ->
> >> > Endpoint 20230426, the data the client read is sorted.
> >> >
> >> > If we don't have the "ordered" flag and we use "the relative
> >> > ordering of data from different endpoints is implementation
> >> > defined", we can't implement a general purpose Flight based
> >> > client library (Flight SQL based client library, Flight SQL
> >> > based ADBC driver and so on). The client library will have
> >> > the following code:
> >> >
> >> >   # TODO: How to detect server_type?
> >> >   if server_type == "DB1"
> >> >     # DB1 returns ordered result.
> >> >     endpoints.each do |endpoint|
> >> >       yield(endpoints.read)
> >> >     end
> >> >   else
> >> >     # Other DBs doesn't return ordered result.
> >> >     # So, we read data in parallel for performance.
> >> >     threads = endpoints.collect do |endpoint|
> >> >       Thread.new do
> >> >         yield(endpoints.read)
> >> >       end
> >> >     end
> >> >     threads.each do |thread|
> >> >       thread.join
> >> >     end
> >> >   end
> >> >
> >> > The client library needs to add 'or server_type == "DB2"' to
> >> > 'if server_type == "DB1"' when DB2 also adds support for
> >> > ordered result. If DB2 2.0 or later is only ordered result
> >> > ready, the client library needs more condition 'or
> >> > (server_type == "DB2" and server_version > 2.0)'.
> >> >
> >> > So I think that the "ordered" flag is useful.
> >> >
> >> >
> >> > Thanks,
> >> > --
> >> > kou
> >> >
> >> > In <CAFhtnRxzMaoqmzWPkqsLoJZW5jmx=
> d_i9ojd9Xy1ydkgkGzVKw@mail.gmail.com>
> >> >   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr
> >> > 2023 10:55:32 -0400,
> >> >   Andrew Lamb <al...@influxdata.com> wrote:
> >> >
> >> >> I wonder if we have considered simply removing the statement "There
> is
> >> no
> >> >> ordering defined on endpoints. Hence, if the returned data has an
> >> ordering,
> >> >> it should be returned in a single endpoint." and  replacing it with
> >> >> something that says "the relative ordering of data from different
> >> endpoints
> >> >> is implementation defined"
> >> >>
> >> >> I am struggling to come up with a concrete usecase for the "ordered"
> >> flag.
> >> >>
> >> >> The ticket references "distributed sort" but most distributed sort
> >> >> algorithms I know of would produce multiple sorted streams that need
> to
> >> be
> >> >> merged together. For example
> >> >>
> >> >> Endpoint 1: (B, C, D)
> >> >> Endpoint 2: (A, E, F)
> >> >>
> >> >> It is not clear how the "ordered" flag would help here
> >> >>
> >> >> If the intent is somehow to signal the client it doesn't have to
> merge
> >> >> (e.g. with data like)
> >> >>
> >> >> Endpoint 1: (A, B, C)
> >> >> Endpoint 2:  (D, E, F)
> >> >>
> >> >> This seems of very limited value if, for example, if the user desired
> >> DESC
> >> >> order, then the endpoint would return
> >> >>
> >> >> Endpoint 1: (C, B, A)
> >> >> Endpoint 2: (F, E, D)
> >> >>
> >> >> Which doesn't seem to conform to the updated definition
> >> >>
> >> >> Andrew
> >> >>
> >> >>
> >> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com>
> >> wrote:
> >> >>
> >> >>> Hi,
> >> >>>
> >> >>> I would like to propose adding support for ordered data to
> >> >>> Apache Arrow Flight. If anyone has comments for this
> >> >>> proposal, please share them at here or the issue for this
> >> >>> proposal: https://github.com/apache/arrow/issues/34852
> >> >>>
> >> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
> >> >>> SQL/ADBC enhancements":
> >> >>>
> >> >>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
> >> >>>
> >> >>> See also the "Flight RPC: Ordered Data" section in the
> >> >>> design document for the proposals:
> >> >>>
> >> >>>
> >> >>>
> >>
> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
> >> >>>
> >> >>> Background:
> >> >>>
> >> >>> Currently, the endpoints within a FlightInfo explicitly have
> >> >>> no ordering.
> >> >>>
> >> >>> This is unnecessarily limiting. Systems can and do implement
> >> >>> distributed sorts, but they can't reflect this in the
> >> >>> current specification.
> >> >>>
> >> >>> Proposal:
> >> >>>
> >> >>> Add a flag to FlightInfo. If the flag is set, the client may
> >> >>> assume that the data is sorted in the same order as the
> >> >>> endpoints. Otherwise, the client cannot make any assumptions
> >> >>> (as before).
> >> >>>
> >> >>> This is a compatible change because the client can just
> >> >>> ignore the flag.
> >> >>>
> >> >>> Implementation:
> >> >>>
> >> >>> https://github.com/apache/arrow/pull/35178 is an
> >> >>> implementation of this proposal. The pull requests has the
> >> >>> followings:
> >> >>>
> >> >>> 1. Format changes:
> >> >>>
> >> >>>
> >>
> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
> >> >>>    * format/Flight.proto
> >> >>>
> >> >>> 2. Documentation changes:
> >> >>>
> >> >>>
> >>
> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
> >> >>>    * docs/source/format/Flight.rst
> >> >>>
> >> >>> 3. The C++ implementation and an integration test:
> >> >>>    * cpp/src/arrow/flight/
> >> >>>
> >> >>> 4. The Java implementation and an integration test (thanks to David
> >> Li!):
> >> >>>    * java/flight/
> >> >>>
> >> >>> 5. The Go implementation and an integration test:
> >> >>>    * go/arrow/flight/
> >> >>>    * go/arrow/internal/flight_integration/
> >> >>>
> >> >>> Next:
> >> >>>
> >> >>> I'll start a vote for this proposal after we reach a consensus
> >> >>> on this proposal.
> >> >>>
> >> >>> It's the standard process for format change.
> >> >>> See also:
> >> >>>
> >> >>> * [VOTE] Formalize how to change format
> >> >>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
> >> >>> * GH-35084: [Docs][Format] Add how to change format specification
> >> >>>   https://github.com/apache/arrow/pull/35174
> >> >>>
> >> >>>
> >> >>> Thanks,
> >> >>> --
> >> >>> kou
> >> >>>
> >>
>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by David Li <li...@apache.org>.
For a lot of partitions - you could have a small number of threads consuming a queue of partitions (and deciding whether you need to sequence/renumber their outputs or not), much like what Acero does with a FileSystemDataset. 

Note that the Flight client itself doesn't do any of this (perhaps it should!); it's clients of Flight that have to deal with this. (...that's a bit confusing)

On Fri, Apr 28, 2023, at 19:06, Weston Pace wrote:
> Thank you both for the extra information.  Acero couldn't actually merge
> the streams today, I was thinking more of datafusion and velox which would
> often want to keep the streams separate, especially if there was some kind
> of filtering or transformation that could be applied before applying a
> sorted merge.
>
> However, I also very much agree that both scenarios are valid.  First, if
> there are a lot of partitions (e.g. far more than the # of parallelism
> units) then you probably don't want parallel paths for all of them.
>
> Second, as you said, simpler clients (e.g. those where all filtering is
> down downstream, or those that don't need any filtering at all) will
> appreciate flight's ability to merge for them.  It makes the client more
> complex but given that clients are already doing this to some extent it
> seems worthwhile.
>
> On Thu, Apr 27, 2023 at 7:45 PM David Li <li...@apache.org> wrote:
>
>> In addition to Kou's response:
>>
>> The individual endpoints have always represented a subset of a single
>> stream of data. So each endpoint in a FlightInfo is a partition of the
>> overall result set.
>>
>> Not all clients want to deal with reading all the Flight streams
>> themselves and may want a single stream of data. (For example: ADBC exposes
>> both paths. The JDBC driver also has to deal with this.) So some client
>> libraries have to deal with the question of whether to read in parallel and
>> whether to keep the result in order or not. A more advanced use case, like
>> Acero, would probably read the endpoints itself and could use this flag to
>> decide how to merge the streams.
>>
>> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
>> > Hi,
>> >
>> >> This seems of very limited value if, for example, if the user desired
>> DESC
>> >> order, then the endpoint would return
>> >>
>> >> Endpoint 1: (C, B, A)
>> >> Endpoint 2: (F, E, D)
>> >
>> > As David said, the server returns
>> >
>> > Endpoint 2: (F, E, D)
>> > Endpoint 1: (C, B, A)
>> >
>> > in this case.
>> >
>> > Here is an use case I think:
>> >
>> > A system has time series data. Each node in the system has
>> > data for one day. If a client requests "SELECT * FROM data
>> > WHERE server = 'server1' ORDER BY created_at DESC", the
>> > system returns the followings:
>> >
>> > Endpoint 20230428: (DATA_FOR_2023_04_28)
>> > Endpoint 20230427: (DATA_FOR_2023_04_27)
>> > Endpoint 20230426: (DATA_FOR_2023_04_26)
>> > ...
>> >
>> > If we have the "ordered" flag, the client can assume that
>> > received data are sorted. In other words, if the client
>> > reads data from Endpoint 20230428 -> Endpoint 20230427 ->
>> > Endpoint 20230426, the data the client read is sorted.
>> >
>> > If we don't have the "ordered" flag and we use "the relative
>> > ordering of data from different endpoints is implementation
>> > defined", we can't implement a general purpose Flight based
>> > client library (Flight SQL based client library, Flight SQL
>> > based ADBC driver and so on). The client library will have
>> > the following code:
>> >
>> >   # TODO: How to detect server_type?
>> >   if server_type == "DB1"
>> >     # DB1 returns ordered result.
>> >     endpoints.each do |endpoint|
>> >       yield(endpoints.read)
>> >     end
>> >   else
>> >     # Other DBs doesn't return ordered result.
>> >     # So, we read data in parallel for performance.
>> >     threads = endpoints.collect do |endpoint|
>> >       Thread.new do
>> >         yield(endpoints.read)
>> >       end
>> >     end
>> >     threads.each do |thread|
>> >       thread.join
>> >     end
>> >   end
>> >
>> > The client library needs to add 'or server_type == "DB2"' to
>> > 'if server_type == "DB1"' when DB2 also adds support for
>> > ordered result. If DB2 2.0 or later is only ordered result
>> > ready, the client library needs more condition 'or
>> > (server_type == "DB2" and server_version > 2.0)'.
>> >
>> > So I think that the "ordered" flag is useful.
>> >
>> >
>> > Thanks,
>> > --
>> > kou
>> >
>> > In <CA...@mail.gmail.com>
>> >   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr
>> > 2023 10:55:32 -0400,
>> >   Andrew Lamb <al...@influxdata.com> wrote:
>> >
>> >> I wonder if we have considered simply removing the statement "There is
>> no
>> >> ordering defined on endpoints. Hence, if the returned data has an
>> ordering,
>> >> it should be returned in a single endpoint." and  replacing it with
>> >> something that says "the relative ordering of data from different
>> endpoints
>> >> is implementation defined"
>> >>
>> >> I am struggling to come up with a concrete usecase for the "ordered"
>> flag.
>> >>
>> >> The ticket references "distributed sort" but most distributed sort
>> >> algorithms I know of would produce multiple sorted streams that need to
>> be
>> >> merged together. For example
>> >>
>> >> Endpoint 1: (B, C, D)
>> >> Endpoint 2: (A, E, F)
>> >>
>> >> It is not clear how the "ordered" flag would help here
>> >>
>> >> If the intent is somehow to signal the client it doesn't have to merge
>> >> (e.g. with data like)
>> >>
>> >> Endpoint 1: (A, B, C)
>> >> Endpoint 2:  (D, E, F)
>> >>
>> >> This seems of very limited value if, for example, if the user desired
>> DESC
>> >> order, then the endpoint would return
>> >>
>> >> Endpoint 1: (C, B, A)
>> >> Endpoint 2: (F, E, D)
>> >>
>> >> Which doesn't seem to conform to the updated definition
>> >>
>> >> Andrew
>> >>
>> >>
>> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com>
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> I would like to propose adding support for ordered data to
>> >>> Apache Arrow Flight. If anyone has comments for this
>> >>> proposal, please share them at here or the issue for this
>> >>> proposal: https://github.com/apache/arrow/issues/34852
>> >>>
>> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
>> >>> SQL/ADBC enhancements":
>> >>>
>> >>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>> >>>
>> >>> See also the "Flight RPC: Ordered Data" section in the
>> >>> design document for the proposals:
>> >>>
>> >>>
>> >>>
>> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>> >>>
>> >>> Background:
>> >>>
>> >>> Currently, the endpoints within a FlightInfo explicitly have
>> >>> no ordering.
>> >>>
>> >>> This is unnecessarily limiting. Systems can and do implement
>> >>> distributed sorts, but they can't reflect this in the
>> >>> current specification.
>> >>>
>> >>> Proposal:
>> >>>
>> >>> Add a flag to FlightInfo. If the flag is set, the client may
>> >>> assume that the data is sorted in the same order as the
>> >>> endpoints. Otherwise, the client cannot make any assumptions
>> >>> (as before).
>> >>>
>> >>> This is a compatible change because the client can just
>> >>> ignore the flag.
>> >>>
>> >>> Implementation:
>> >>>
>> >>> https://github.com/apache/arrow/pull/35178 is an
>> >>> implementation of this proposal. The pull requests has the
>> >>> followings:
>> >>>
>> >>> 1. Format changes:
>> >>>
>> >>>
>> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>> >>>    * format/Flight.proto
>> >>>
>> >>> 2. Documentation changes:
>> >>>
>> >>>
>> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>> >>>    * docs/source/format/Flight.rst
>> >>>
>> >>> 3. The C++ implementation and an integration test:
>> >>>    * cpp/src/arrow/flight/
>> >>>
>> >>> 4. The Java implementation and an integration test (thanks to David
>> Li!):
>> >>>    * java/flight/
>> >>>
>> >>> 5. The Go implementation and an integration test:
>> >>>    * go/arrow/flight/
>> >>>    * go/arrow/internal/flight_integration/
>> >>>
>> >>> Next:
>> >>>
>> >>> I'll start a vote for this proposal after we reach a consensus
>> >>> on this proposal.
>> >>>
>> >>> It's the standard process for format change.
>> >>> See also:
>> >>>
>> >>> * [VOTE] Formalize how to change format
>> >>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
>> >>> * GH-35084: [Docs][Format] Add how to change format specification
>> >>>   https://github.com/apache/arrow/pull/35174
>> >>>
>> >>>
>> >>> Thanks,
>> >>> --
>> >>> kou
>> >>>
>>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by Weston Pace <we...@gmail.com>.
Thank you both for the extra information.  Acero couldn't actually merge
the streams today, I was thinking more of datafusion and velox which would
often want to keep the streams separate, especially if there was some kind
of filtering or transformation that could be applied before applying a
sorted merge.

However, I also very much agree that both scenarios are valid.  First, if
there are a lot of partitions (e.g. far more than the # of parallelism
units) then you probably don't want parallel paths for all of them.

Second, as you said, simpler clients (e.g. those where all filtering is
down downstream, or those that don't need any filtering at all) will
appreciate flight's ability to merge for them.  It makes the client more
complex but given that clients are already doing this to some extent it
seems worthwhile.

On Thu, Apr 27, 2023 at 7:45 PM David Li <li...@apache.org> wrote:

> In addition to Kou's response:
>
> The individual endpoints have always represented a subset of a single
> stream of data. So each endpoint in a FlightInfo is a partition of the
> overall result set.
>
> Not all clients want to deal with reading all the Flight streams
> themselves and may want a single stream of data. (For example: ADBC exposes
> both paths. The JDBC driver also has to deal with this.) So some client
> libraries have to deal with the question of whether to read in parallel and
> whether to keep the result in order or not. A more advanced use case, like
> Acero, would probably read the endpoints itself and could use this flag to
> decide how to merge the streams.
>
> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
> > Hi,
> >
> >> This seems of very limited value if, for example, if the user desired
> DESC
> >> order, then the endpoint would return
> >>
> >> Endpoint 1: (C, B, A)
> >> Endpoint 2: (F, E, D)
> >
> > As David said, the server returns
> >
> > Endpoint 2: (F, E, D)
> > Endpoint 1: (C, B, A)
> >
> > in this case.
> >
> > Here is an use case I think:
> >
> > A system has time series data. Each node in the system has
> > data for one day. If a client requests "SELECT * FROM data
> > WHERE server = 'server1' ORDER BY created_at DESC", the
> > system returns the followings:
> >
> > Endpoint 20230428: (DATA_FOR_2023_04_28)
> > Endpoint 20230427: (DATA_FOR_2023_04_27)
> > Endpoint 20230426: (DATA_FOR_2023_04_26)
> > ...
> >
> > If we have the "ordered" flag, the client can assume that
> > received data are sorted. In other words, if the client
> > reads data from Endpoint 20230428 -> Endpoint 20230427 ->
> > Endpoint 20230426, the data the client read is sorted.
> >
> > If we don't have the "ordered" flag and we use "the relative
> > ordering of data from different endpoints is implementation
> > defined", we can't implement a general purpose Flight based
> > client library (Flight SQL based client library, Flight SQL
> > based ADBC driver and so on). The client library will have
> > the following code:
> >
> >   # TODO: How to detect server_type?
> >   if server_type == "DB1"
> >     # DB1 returns ordered result.
> >     endpoints.each do |endpoint|
> >       yield(endpoints.read)
> >     end
> >   else
> >     # Other DBs doesn't return ordered result.
> >     # So, we read data in parallel for performance.
> >     threads = endpoints.collect do |endpoint|
> >       Thread.new do
> >         yield(endpoints.read)
> >       end
> >     end
> >     threads.each do |thread|
> >       thread.join
> >     end
> >   end
> >
> > The client library needs to add 'or server_type == "DB2"' to
> > 'if server_type == "DB1"' when DB2 also adds support for
> > ordered result. If DB2 2.0 or later is only ordered result
> > ready, the client library needs more condition 'or
> > (server_type == "DB2" and server_version > 2.0)'.
> >
> > So I think that the "ordered" flag is useful.
> >
> >
> > Thanks,
> > --
> > kou
> >
> > In <CA...@mail.gmail.com>
> >   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr
> > 2023 10:55:32 -0400,
> >   Andrew Lamb <al...@influxdata.com> wrote:
> >
> >> I wonder if we have considered simply removing the statement "There is
> no
> >> ordering defined on endpoints. Hence, if the returned data has an
> ordering,
> >> it should be returned in a single endpoint." and  replacing it with
> >> something that says "the relative ordering of data from different
> endpoints
> >> is implementation defined"
> >>
> >> I am struggling to come up with a concrete usecase for the "ordered"
> flag.
> >>
> >> The ticket references "distributed sort" but most distributed sort
> >> algorithms I know of would produce multiple sorted streams that need to
> be
> >> merged together. For example
> >>
> >> Endpoint 1: (B, C, D)
> >> Endpoint 2: (A, E, F)
> >>
> >> It is not clear how the "ordered" flag would help here
> >>
> >> If the intent is somehow to signal the client it doesn't have to merge
> >> (e.g. with data like)
> >>
> >> Endpoint 1: (A, B, C)
> >> Endpoint 2:  (D, E, F)
> >>
> >> This seems of very limited value if, for example, if the user desired
> DESC
> >> order, then the endpoint would return
> >>
> >> Endpoint 1: (C, B, A)
> >> Endpoint 2: (F, E, D)
> >>
> >> Which doesn't seem to conform to the updated definition
> >>
> >> Andrew
> >>
> >>
> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> I would like to propose adding support for ordered data to
> >>> Apache Arrow Flight. If anyone has comments for this
> >>> proposal, please share them at here or the issue for this
> >>> proposal: https://github.com/apache/arrow/issues/34852
> >>>
> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
> >>> SQL/ADBC enhancements":
> >>>
> >>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
> >>>
> >>> See also the "Flight RPC: Ordered Data" section in the
> >>> design document for the proposals:
> >>>
> >>>
> >>>
> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
> >>>
> >>> Background:
> >>>
> >>> Currently, the endpoints within a FlightInfo explicitly have
> >>> no ordering.
> >>>
> >>> This is unnecessarily limiting. Systems can and do implement
> >>> distributed sorts, but they can't reflect this in the
> >>> current specification.
> >>>
> >>> Proposal:
> >>>
> >>> Add a flag to FlightInfo. If the flag is set, the client may
> >>> assume that the data is sorted in the same order as the
> >>> endpoints. Otherwise, the client cannot make any assumptions
> >>> (as before).
> >>>
> >>> This is a compatible change because the client can just
> >>> ignore the flag.
> >>>
> >>> Implementation:
> >>>
> >>> https://github.com/apache/arrow/pull/35178 is an
> >>> implementation of this proposal. The pull requests has the
> >>> followings:
> >>>
> >>> 1. Format changes:
> >>>
> >>>
> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
> >>>    * format/Flight.proto
> >>>
> >>> 2. Documentation changes:
> >>>
> >>>
> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
> >>>    * docs/source/format/Flight.rst
> >>>
> >>> 3. The C++ implementation and an integration test:
> >>>    * cpp/src/arrow/flight/
> >>>
> >>> 4. The Java implementation and an integration test (thanks to David
> Li!):
> >>>    * java/flight/
> >>>
> >>> 5. The Go implementation and an integration test:
> >>>    * go/arrow/flight/
> >>>    * go/arrow/internal/flight_integration/
> >>>
> >>> Next:
> >>>
> >>> I'll start a vote for this proposal after we reach a consensus
> >>> on this proposal.
> >>>
> >>> It's the standard process for format change.
> >>> See also:
> >>>
> >>> * [VOTE] Formalize how to change format
> >>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
> >>> * GH-35084: [Docs][Format] Add how to change format specification
> >>>   https://github.com/apache/arrow/pull/35174
> >>>
> >>>
> >>> Thanks,
> >>> --
> >>> kou
> >>>
>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by David Li <li...@apache.org>.
In addition to Kou's response:

The individual endpoints have always represented a subset of a single stream of data. So each endpoint in a FlightInfo is a partition of the overall result set.

Not all clients want to deal with reading all the Flight streams themselves and may want a single stream of data. (For example: ADBC exposes both paths. The JDBC driver also has to deal with this.) So some client libraries have to deal with the question of whether to read in parallel and whether to keep the result in order or not. A more advanced use case, like Acero, would probably read the endpoints itself and could use this flag to decide how to merge the streams.

On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
> Hi,
>
>> This seems of very limited value if, for example, if the user desired DESC
>> order, then the endpoint would return
>> 
>> Endpoint 1: (C, B, A)
>> Endpoint 2: (F, E, D)
>
> As David said, the server returns
>
> Endpoint 2: (F, E, D)
> Endpoint 1: (C, B, A)
>
> in this case.
>
> Here is an use case I think:
>
> A system has time series data. Each node in the system has
> data for one day. If a client requests "SELECT * FROM data
> WHERE server = 'server1' ORDER BY created_at DESC", the
> system returns the followings:
>
> Endpoint 20230428: (DATA_FOR_2023_04_28)
> Endpoint 20230427: (DATA_FOR_2023_04_27)
> Endpoint 20230426: (DATA_FOR_2023_04_26)
> ...
>
> If we have the "ordered" flag, the client can assume that
> received data are sorted. In other words, if the client
> reads data from Endpoint 20230428 -> Endpoint 20230427 ->
> Endpoint 20230426, the data the client read is sorted.
>
> If we don't have the "ordered" flag and we use "the relative
> ordering of data from different endpoints is implementation
> defined", we can't implement a general purpose Flight based
> client library (Flight SQL based client library, Flight SQL
> based ADBC driver and so on). The client library will have
> the following code:
>
>   # TODO: How to detect server_type?
>   if server_type == "DB1"
>     # DB1 returns ordered result.
>     endpoints.each do |endpoint|
>       yield(endpoints.read)
>     end
>   else
>     # Other DBs doesn't return ordered result.
>     # So, we read data in parallel for performance.
>     threads = endpoints.collect do |endpoint|
>       Thread.new do
>         yield(endpoints.read)
>       end
>     end
>     threads.each do |thread|
>       thread.join
>     end
>   end
>
> The client library needs to add 'or server_type == "DB2"' to
> 'if server_type == "DB1"' when DB2 also adds support for
> ordered result. If DB2 2.0 or later is only ordered result
> ready, the client library needs more condition 'or
> (server_type == "DB2" and server_version > 2.0)'.
>
> So I think that the "ordered" flag is useful.
>
>
> Thanks,
> -- 
> kou
>
> In <CA...@mail.gmail.com>
>   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr 
> 2023 10:55:32 -0400,
>   Andrew Lamb <al...@influxdata.com> wrote:
>
>> I wonder if we have considered simply removing the statement "There is no
>> ordering defined on endpoints. Hence, if the returned data has an ordering,
>> it should be returned in a single endpoint." and  replacing it with
>> something that says "the relative ordering of data from different endpoints
>> is implementation defined"
>> 
>> I am struggling to come up with a concrete usecase for the "ordered" flag.
>> 
>> The ticket references "distributed sort" but most distributed sort
>> algorithms I know of would produce multiple sorted streams that need to be
>> merged together. For example
>> 
>> Endpoint 1: (B, C, D)
>> Endpoint 2: (A, E, F)
>> 
>> It is not clear how the "ordered" flag would help here
>> 
>> If the intent is somehow to signal the client it doesn't have to merge
>> (e.g. with data like)
>> 
>> Endpoint 1: (A, B, C)
>> Endpoint 2:  (D, E, F)
>> 
>> This seems of very limited value if, for example, if the user desired DESC
>> order, then the endpoint would return
>> 
>> Endpoint 1: (C, B, A)
>> Endpoint 2: (F, E, D)
>> 
>> Which doesn't seem to conform to the updated definition
>> 
>> Andrew
>> 
>> 
>> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com> wrote:
>> 
>>> Hi,
>>>
>>> I would like to propose adding support for ordered data to
>>> Apache Arrow Flight. If anyone has comments for this
>>> proposal, please share them at here or the issue for this
>>> proposal: https://github.com/apache/arrow/issues/34852
>>>
>>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
>>> SQL/ADBC enhancements":
>>>
>>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>>>
>>> See also the "Flight RPC: Ordered Data" section in the
>>> design document for the proposals:
>>>
>>>
>>> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>>>
>>> Background:
>>>
>>> Currently, the endpoints within a FlightInfo explicitly have
>>> no ordering.
>>>
>>> This is unnecessarily limiting. Systems can and do implement
>>> distributed sorts, but they can't reflect this in the
>>> current specification.
>>>
>>> Proposal:
>>>
>>> Add a flag to FlightInfo. If the flag is set, the client may
>>> assume that the data is sorted in the same order as the
>>> endpoints. Otherwise, the client cannot make any assumptions
>>> (as before).
>>>
>>> This is a compatible change because the client can just
>>> ignore the flag.
>>>
>>> Implementation:
>>>
>>> https://github.com/apache/arrow/pull/35178 is an
>>> implementation of this proposal. The pull requests has the
>>> followings:
>>>
>>> 1. Format changes:
>>>
>>> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>>>    * format/Flight.proto
>>>
>>> 2. Documentation changes:
>>>
>>> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>>>    * docs/source/format/Flight.rst
>>>
>>> 3. The C++ implementation and an integration test:
>>>    * cpp/src/arrow/flight/
>>>
>>> 4. The Java implementation and an integration test (thanks to David Li!):
>>>    * java/flight/
>>>
>>> 5. The Go implementation and an integration test:
>>>    * go/arrow/flight/
>>>    * go/arrow/internal/flight_integration/
>>>
>>> Next:
>>>
>>> I'll start a vote for this proposal after we reach a consensus
>>> on this proposal.
>>>
>>> It's the standard process for format change.
>>> See also:
>>>
>>> * [VOTE] Formalize how to change format
>>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
>>> * GH-35084: [Docs][Format] Add how to change format specification
>>>   https://github.com/apache/arrow/pull/35174
>>>
>>>
>>> Thanks,
>>> --
>>> kou
>>>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,

> This seems of very limited value if, for example, if the user desired DESC
> order, then the endpoint would return
> 
> Endpoint 1: (C, B, A)
> Endpoint 2: (F, E, D)

As David said, the server returns

Endpoint 2: (F, E, D)
Endpoint 1: (C, B, A)

in this case.

Here is an use case I think:

A system has time series data. Each node in the system has
data for one day. If a client requests "SELECT * FROM data
WHERE server = 'server1' ORDER BY created_at DESC", the
system returns the followings:

Endpoint 20230428: (DATA_FOR_2023_04_28)
Endpoint 20230427: (DATA_FOR_2023_04_27)
Endpoint 20230426: (DATA_FOR_2023_04_26)
...

If we have the "ordered" flag, the client can assume that
received data are sorted. In other words, if the client
reads data from Endpoint 20230428 -> Endpoint 20230427 ->
Endpoint 20230426, the data the client read is sorted.

If we don't have the "ordered" flag and we use "the relative
ordering of data from different endpoints is implementation
defined", we can't implement a general purpose Flight based
client library (Flight SQL based client library, Flight SQL
based ADBC driver and so on). The client library will have
the following code:

  # TODO: How to detect server_type?
  if server_type == "DB1"
    # DB1 returns ordered result.
    endpoints.each do |endpoint|
      yield(endpoints.read)
    end
  else
    # Other DBs doesn't return ordered result.
    # So, we read data in parallel for performance.
    threads = endpoints.collect do |endpoint|
      Thread.new do
        yield(endpoints.read)
      end
    end
    threads.each do |thread|
      thread.join
    end
  end

The client library needs to add 'or server_type == "DB2"' to
'if server_type == "DB1"' when DB2 also adds support for
ordered result. If DB2 2.0 or later is only ordered result
ready, the client library needs more condition 'or
(server_type == "DB2" and server_version > 2.0)'.

So I think that the "ordered" flag is useful.


Thanks,
-- 
kou

In <CA...@mail.gmail.com>
  "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr 2023 10:55:32 -0400,
  Andrew Lamb <al...@influxdata.com> wrote:

> I wonder if we have considered simply removing the statement "There is no
> ordering defined on endpoints. Hence, if the returned data has an ordering,
> it should be returned in a single endpoint." and  replacing it with
> something that says "the relative ordering of data from different endpoints
> is implementation defined"
> 
> I am struggling to come up with a concrete usecase for the "ordered" flag.
> 
> The ticket references "distributed sort" but most distributed sort
> algorithms I know of would produce multiple sorted streams that need to be
> merged together. For example
> 
> Endpoint 1: (B, C, D)
> Endpoint 2: (A, E, F)
> 
> It is not clear how the "ordered" flag would help here
> 
> If the intent is somehow to signal the client it doesn't have to merge
> (e.g. with data like)
> 
> Endpoint 1: (A, B, C)
> Endpoint 2:  (D, E, F)
> 
> This seems of very limited value if, for example, if the user desired DESC
> order, then the endpoint would return
> 
> Endpoint 1: (C, B, A)
> Endpoint 2: (F, E, D)
> 
> Which doesn't seem to conform to the updated definition
> 
> Andrew
> 
> 
> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com> wrote:
> 
>> Hi,
>>
>> I would like to propose adding support for ordered data to
>> Apache Arrow Flight. If anyone has comments for this
>> proposal, please share them at here or the issue for this
>> proposal: https://github.com/apache/arrow/issues/34852
>>
>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
>> SQL/ADBC enhancements":
>>
>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>>
>> See also the "Flight RPC: Ordered Data" section in the
>> design document for the proposals:
>>
>>
>> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>>
>> Background:
>>
>> Currently, the endpoints within a FlightInfo explicitly have
>> no ordering.
>>
>> This is unnecessarily limiting. Systems can and do implement
>> distributed sorts, but they can't reflect this in the
>> current specification.
>>
>> Proposal:
>>
>> Add a flag to FlightInfo. If the flag is set, the client may
>> assume that the data is sorted in the same order as the
>> endpoints. Otherwise, the client cannot make any assumptions
>> (as before).
>>
>> This is a compatible change because the client can just
>> ignore the flag.
>>
>> Implementation:
>>
>> https://github.com/apache/arrow/pull/35178 is an
>> implementation of this proposal. The pull requests has the
>> followings:
>>
>> 1. Format changes:
>>
>> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>>    * format/Flight.proto
>>
>> 2. Documentation changes:
>>
>> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>>    * docs/source/format/Flight.rst
>>
>> 3. The C++ implementation and an integration test:
>>    * cpp/src/arrow/flight/
>>
>> 4. The Java implementation and an integration test (thanks to David Li!):
>>    * java/flight/
>>
>> 5. The Go implementation and an integration test:
>>    * go/arrow/flight/
>>    * go/arrow/internal/flight_integration/
>>
>> Next:
>>
>> I'll start a vote for this proposal after we reach a consensus
>> on this proposal.
>>
>> It's the standard process for format change.
>> See also:
>>
>> * [VOTE] Formalize how to change format
>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
>> * GH-35084: [Docs][Format] Add how to change format specification
>>   https://github.com/apache/arrow/pull/35174
>>
>>
>> Thanks,
>> --
>> kou
>>

Re: [DISCUSS][Format][Flight] Ordered data support

Posted by Andrew Lamb <al...@influxdata.com>.
I wonder if we have considered simply removing the statement "There is no
ordering defined on endpoints. Hence, if the returned data has an ordering,
it should be returned in a single endpoint." and  replacing it with
something that says "the relative ordering of data from different endpoints
is implementation defined"

I am struggling to come up with a concrete usecase for the "ordered" flag.

The ticket references "distributed sort" but most distributed sort
algorithms I know of would produce multiple sorted streams that need to be
merged together. For example

Endpoint 1: (B, C, D)
Endpoint 2: (A, E, F)

It is not clear how the "ordered" flag would help here

If the intent is somehow to signal the client it doesn't have to merge
(e.g. with data like)

Endpoint 1: (A, B, C)
Endpoint 2:  (D, E, F)

This seems of very limited value if, for example, if the user desired DESC
order, then the endpoint would return

Endpoint 1: (C, B, A)
Endpoint 2: (F, E, D)

Which doesn't seem to conform to the updated definition

Andrew


On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com> wrote:

> Hi,
>
> I would like to propose adding support for ordered data to
> Apache Arrow Flight. If anyone has comments for this
> proposal, please share them at here or the issue for this
> proposal: https://github.com/apache/arrow/issues/34852
>
> This is one of proposals in "[DISCUSS] Flight RPC/Flight
> SQL/ADBC enhancements":
>
>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>
> See also the "Flight RPC: Ordered Data" section in the
> design document for the proposals:
>
>
> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>
> Background:
>
> Currently, the endpoints within a FlightInfo explicitly have
> no ordering.
>
> This is unnecessarily limiting. Systems can and do implement
> distributed sorts, but they can't reflect this in the
> current specification.
>
> Proposal:
>
> Add a flag to FlightInfo. If the flag is set, the client may
> assume that the data is sorted in the same order as the
> endpoints. Otherwise, the client cannot make any assumptions
> (as before).
>
> This is a compatible change because the client can just
> ignore the flag.
>
> Implementation:
>
> https://github.com/apache/arrow/pull/35178 is an
> implementation of this proposal. The pull requests has the
> followings:
>
> 1. Format changes:
>
> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>    * format/Flight.proto
>
> 2. Documentation changes:
>
> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>    * docs/source/format/Flight.rst
>
> 3. The C++ implementation and an integration test:
>    * cpp/src/arrow/flight/
>
> 4. The Java implementation and an integration test (thanks to David Li!):
>    * java/flight/
>
> 5. The Go implementation and an integration test:
>    * go/arrow/flight/
>    * go/arrow/internal/flight_integration/
>
> Next:
>
> I'll start a vote for this proposal after we reach a consensus
> on this proposal.
>
> It's the standard process for format change.
> See also:
>
> * [VOTE] Formalize how to change format
>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
> * GH-35084: [Docs][Format] Add how to change format specification
>   https://github.com/apache/arrow/pull/35174
>
>
> Thanks,
> --
> kou
>