You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Sutou Kouhei <ko...@clear-code.com> on 2023/05/02 03:23:41 UTC

Re: [DISCUSS][Format][Flight] Ordered data support

Hi Andrew,

Thanks for your wording suggestions!
I've merged it.

Here is the latest document:
  http://crossbow.voltrondata.com/pr_docs/35178/format/Flight.html#downloading-data

If there is no more comment in a few days, I'll start voting
on this proposal.


Thanks,
-- 
kou

In <CA...@mail.gmail.com>
  "Re: [DISCUSS][Format][Flight] Ordered data support" on Fri, 28 Apr 2023 08:35:34 -0400,
  Andrew Lamb <al...@influxdata.com> wrote:

> Thank you for the clarification.
> 
> The point I was missing was that this flag is instructing the FlightClient
> how to present the results to the client application, rather than specific
> properties of the underlying stream.
> 
> I can see the value of returning result streams in a specific order
> (by endpoint) or also being able to retrieve the streams from the endpoints
> in any order (and potentially interleave the results from the endpoint as
> they arrive)
> 
> I left some suggestions on clarifying the wording on [1] that might help
> avoid figure confusion
> 
> Andrew
> 
> [1] https://github.com/apache/arrow/pull/35178
> 
> On Fri, Apr 28, 2023 at 1:02 AM David Li <li...@apache.org> wrote:
> 
>> For a lot of partitions - you could have a small number of threads
>> consuming a queue of partitions (and deciding whether you need to
>> sequence/renumber their outputs or not), much like what Acero does with a
>> FileSystemDataset.
>>
>> Note that the Flight client itself doesn't do any of this (perhaps it
>> should!); it's clients of Flight that have to deal with this. (...that's a
>> bit confusing)
>>
>> On Fri, Apr 28, 2023, at 19:06, Weston Pace wrote:
>> > Thank you both for the extra information.  Acero couldn't actually merge
>> > the streams today, I was thinking more of datafusion and velox which
>> would
>> > often want to keep the streams separate, especially if there was some
>> kind
>> > of filtering or transformation that could be applied before applying a
>> > sorted merge.
>> >
>> > However, I also very much agree that both scenarios are valid.  First, if
>> > there are a lot of partitions (e.g. far more than the # of parallelism
>> > units) then you probably don't want parallel paths for all of them.
>> >
>> > Second, as you said, simpler clients (e.g. those where all filtering is
>> > down downstream, or those that don't need any filtering at all) will
>> > appreciate flight's ability to merge for them.  It makes the client more
>> > complex but given that clients are already doing this to some extent it
>> > seems worthwhile.
>> >
>> > On Thu, Apr 27, 2023 at 7:45 PM David Li <li...@apache.org> wrote:
>> >
>> >> In addition to Kou's response:
>> >>
>> >> The individual endpoints have always represented a subset of a single
>> >> stream of data. So each endpoint in a FlightInfo is a partition of the
>> >> overall result set.
>> >>
>> >> Not all clients want to deal with reading all the Flight streams
>> >> themselves and may want a single stream of data. (For example: ADBC
>> exposes
>> >> both paths. The JDBC driver also has to deal with this.) So some client
>> >> libraries have to deal with the question of whether to read in parallel
>> and
>> >> whether to keep the result in order or not. A more advanced use case,
>> like
>> >> Acero, would probably read the endpoints itself and could use this flag
>> to
>> >> decide how to merge the streams.
>> >>
>> >> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
>> >> > Hi,
>> >> >
>> >> >> This seems of very limited value if, for example, if the user desired
>> >> DESC
>> >> >> order, then the endpoint would return
>> >> >>
>> >> >> Endpoint 1: (C, B, A)
>> >> >> Endpoint 2: (F, E, D)
>> >> >
>> >> > As David said, the server returns
>> >> >
>> >> > Endpoint 2: (F, E, D)
>> >> > Endpoint 1: (C, B, A)
>> >> >
>> >> > in this case.
>> >> >
>> >> > Here is an use case I think:
>> >> >
>> >> > A system has time series data. Each node in the system has
>> >> > data for one day. If a client requests "SELECT * FROM data
>> >> > WHERE server = 'server1' ORDER BY created_at DESC", the
>> >> > system returns the followings:
>> >> >
>> >> > Endpoint 20230428: (DATA_FOR_2023_04_28)
>> >> > Endpoint 20230427: (DATA_FOR_2023_04_27)
>> >> > Endpoint 20230426: (DATA_FOR_2023_04_26)
>> >> > ...
>> >> >
>> >> > If we have the "ordered" flag, the client can assume that
>> >> > received data are sorted. In other words, if the client
>> >> > reads data from Endpoint 20230428 -> Endpoint 20230427 ->
>> >> > Endpoint 20230426, the data the client read is sorted.
>> >> >
>> >> > If we don't have the "ordered" flag and we use "the relative
>> >> > ordering of data from different endpoints is implementation
>> >> > defined", we can't implement a general purpose Flight based
>> >> > client library (Flight SQL based client library, Flight SQL
>> >> > based ADBC driver and so on). The client library will have
>> >> > the following code:
>> >> >
>> >> >   # TODO: How to detect server_type?
>> >> >   if server_type == "DB1"
>> >> >     # DB1 returns ordered result.
>> >> >     endpoints.each do |endpoint|
>> >> >       yield(endpoints.read)
>> >> >     end
>> >> >   else
>> >> >     # Other DBs doesn't return ordered result.
>> >> >     # So, we read data in parallel for performance.
>> >> >     threads = endpoints.collect do |endpoint|
>> >> >       Thread.new do
>> >> >         yield(endpoints.read)
>> >> >       end
>> >> >     end
>> >> >     threads.each do |thread|
>> >> >       thread.join
>> >> >     end
>> >> >   end
>> >> >
>> >> > The client library needs to add 'or server_type == "DB2"' to
>> >> > 'if server_type == "DB1"' when DB2 also adds support for
>> >> > ordered result. If DB2 2.0 or later is only ordered result
>> >> > ready, the client library needs more condition 'or
>> >> > (server_type == "DB2" and server_version > 2.0)'.
>> >> >
>> >> > So I think that the "ordered" flag is useful.
>> >> >
>> >> >
>> >> > Thanks,
>> >> > --
>> >> > kou
>> >> >
>> >> > In <CAFhtnRxzMaoqmzWPkqsLoJZW5jmx=
>> d_i9ojd9Xy1ydkgkGzVKw@mail.gmail.com>
>> >> >   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr
>> >> > 2023 10:55:32 -0400,
>> >> >   Andrew Lamb <al...@influxdata.com> wrote:
>> >> >
>> >> >> I wonder if we have considered simply removing the statement "There
>> is
>> >> no
>> >> >> ordering defined on endpoints. Hence, if the returned data has an
>> >> ordering,
>> >> >> it should be returned in a single endpoint." and  replacing it with
>> >> >> something that says "the relative ordering of data from different
>> >> endpoints
>> >> >> is implementation defined"
>> >> >>
>> >> >> I am struggling to come up with a concrete usecase for the "ordered"
>> >> flag.
>> >> >>
>> >> >> The ticket references "distributed sort" but most distributed sort
>> >> >> algorithms I know of would produce multiple sorted streams that need
>> to
>> >> be
>> >> >> merged together. For example
>> >> >>
>> >> >> Endpoint 1: (B, C, D)
>> >> >> Endpoint 2: (A, E, F)
>> >> >>
>> >> >> It is not clear how the "ordered" flag would help here
>> >> >>
>> >> >> If the intent is somehow to signal the client it doesn't have to
>> merge
>> >> >> (e.g. with data like)
>> >> >>
>> >> >> Endpoint 1: (A, B, C)
>> >> >> Endpoint 2:  (D, E, F)
>> >> >>
>> >> >> This seems of very limited value if, for example, if the user desired
>> >> DESC
>> >> >> order, then the endpoint would return
>> >> >>
>> >> >> Endpoint 1: (C, B, A)
>> >> >> Endpoint 2: (F, E, D)
>> >> >>
>> >> >> Which doesn't seem to conform to the updated definition
>> >> >>
>> >> >> Andrew
>> >> >>
>> >> >>
>> >> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <ko...@clear-code.com>
>> >> wrote:
>> >> >>
>> >> >>> Hi,
>> >> >>>
>> >> >>> I would like to propose adding support for ordered data to
>> >> >>> Apache Arrow Flight. If anyone has comments for this
>> >> >>> proposal, please share them at here or the issue for this
>> >> >>> proposal: https://github.com/apache/arrow/issues/34852
>> >> >>>
>> >> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
>> >> >>> SQL/ADBC enhancements":
>> >> >>>
>> >> >>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
>> >> >>>
>> >> >>> See also the "Flight RPC: Ordered Data" section in the
>> >> >>> design document for the proposals:
>> >> >>>
>> >> >>>
>> >> >>>
>> >>
>> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
>> >> >>>
>> >> >>> Background:
>> >> >>>
>> >> >>> Currently, the endpoints within a FlightInfo explicitly have
>> >> >>> no ordering.
>> >> >>>
>> >> >>> This is unnecessarily limiting. Systems can and do implement
>> >> >>> distributed sorts, but they can't reflect this in the
>> >> >>> current specification.
>> >> >>>
>> >> >>> Proposal:
>> >> >>>
>> >> >>> Add a flag to FlightInfo. If the flag is set, the client may
>> >> >>> assume that the data is sorted in the same order as the
>> >> >>> endpoints. Otherwise, the client cannot make any assumptions
>> >> >>> (as before).
>> >> >>>
>> >> >>> This is a compatible change because the client can just
>> >> >>> ignore the flag.
>> >> >>>
>> >> >>> Implementation:
>> >> >>>
>> >> >>> https://github.com/apache/arrow/pull/35178 is an
>> >> >>> implementation of this proposal. The pull requests has the
>> >> >>> followings:
>> >> >>>
>> >> >>> 1. Format changes:
>> >> >>>
>> >> >>>
>> >>
>> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
>> >> >>>    * format/Flight.proto
>> >> >>>
>> >> >>> 2. Documentation changes:
>> >> >>>
>> >> >>>
>> >>
>> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
>> >> >>>    * docs/source/format/Flight.rst
>> >> >>>
>> >> >>> 3. The C++ implementation and an integration test:
>> >> >>>    * cpp/src/arrow/flight/
>> >> >>>
>> >> >>> 4. The Java implementation and an integration test (thanks to David
>> >> Li!):
>> >> >>>    * java/flight/
>> >> >>>
>> >> >>> 5. The Go implementation and an integration test:
>> >> >>>    * go/arrow/flight/
>> >> >>>    * go/arrow/internal/flight_integration/
>> >> >>>
>> >> >>> Next:
>> >> >>>
>> >> >>> I'll start a vote for this proposal after we reach a consensus
>> >> >>> on this proposal.
>> >> >>>
>> >> >>> It's the standard process for format change.
>> >> >>> See also:
>> >> >>>
>> >> >>> * [VOTE] Formalize how to change format
>> >> >>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
>> >> >>> * GH-35084: [Docs][Format] Add how to change format specification
>> >> >>>   https://github.com/apache/arrow/pull/35174
>> >> >>>
>> >> >>>
>> >> >>> Thanks,
>> >> >>> --
>> >> >>> kou
>> >> >>>
>> >>
>>