You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Yibo Cai <yi...@arm.com> on 2022/01/18 09:35:18 UTC

Re: Arrow in HPC

Some updates.

I tested David's UCX transport patch over 100Gb network. FlightRPC over 
UCX/RDMA improves throughput about 50%, with lower and flat latency.
And I think there are chances to improve further. See test report [1].

For the data plane approach, the PoC shared memory data plane also 
introduces significantly performance boost. Details at [2].

Glad to see there are big potentials to improve FlightRPC performance.

[1] https://issues.apache.org/jira/browse/ARROW-15229
[2] https://issues.apache.org/jira/browse/ARROW-15282

On 12/30/21 11:57 PM, David Li wrote:
> Ah, I see.
> 
> I think both projects can proceed as well. At some point we will have to figure out how to merge them, but I think it's too early to see how exactly we will want to refactor things.
> 
> I looked over the code and I don't have any important comments for now. Looking forward to reviewing when it's ready.
> 
> -David
> 
> On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
>>
>>
>> On 12/29/21 11:03 PM, David Li wrote:
>>> Awesome, thanks for sharing this too!
>>>
>>> The refactoring you have with DataClientStream what I would like to do as well - I think much of the existing code can be adapted to be more transport-agnostic and then it will be easier to support new transports (whether data-only or for all methods).
>>>
>>> Where do you see the gaps between gRPC and this? I think what would happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3) client sees the unfamiliar prefix and creates a new client for the DoGet call (it would have to do this anyways if, for instance, the GetFlightInfo call returned the address of a different server).
>>>
>>
>> I mean implementation details. Some unit test runs longer than expected
>> (data plane timeouts reading from an ended stream). Looks grpc stream
>> finish message is not correctly intercepted and forwarded to data plane.
>> I don't think it's big problem, just need some time to debug.
>>
>>> I also wonder how this stacks up to UCX's shared memory backend (I did not test this though).
>>>
>>
>> I implemented a shared memory data plane only to verify and consolidate
>> the data plane design, as it's the easiest (and useful) driver. I also
>> plan to implement a socket based data plane, not useful in practice,
>> only to make sure the design works ok across network. Then we can add
>> more useful drivers like UCX or DPDK (the benefit of DPDK is it works on
>> commodity hardware, unlike UCX/RDMA which requires expensive equipment).
>>
>>> I would like to be able to support entire new transports for certain cases (namely browser support - though perhaps one of the gRPC proxies would suffice there), but even in that case, we could make it so that a new transport only needs to implement the data plane methods. Only having to support the data plane methods would save significant implementation effort for all non-browser cases so I think it's a worthwhile approach.
>>>
>>
>> Thanks for being interest in this approach. My current plan is to first
>> refactor shared memory data plane to verify it beats grpc in local rpc
>> by considerable margin, otherwise there must be big mistakes in my
>> design. After that I will fix unit test issues and deliver for community
>> review.
>>
>> Anyway, don't let me block your implementations. And if you think it's
>> useful, I can push current code for more detailed discussion.
>>
>>> -David
>>>
>>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
>>>> Thanks David to initiate UCX integration, great work!
>>>> I think 5Gbps network is too limited for performance evaluation. I will try the patch on 100Gb RDMA network, hopefully we can see some improvements.
>>>> I once benchmarked flight over 100Gb network [1], grpc based throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I also benchmarked raw RDMA performance (same batch sizes as flight), one thread can achive 9GB/s with 12us latency. Of couse the comparison is not fair. With David's patch, we can get a more realistic comparison.
>>>>
>>>> I'm implementing a data plane approach to hope we can adopt new data acceleration methods easily. My approach is to replace only the FlighData transmission of DoGet/Put/Exchange with data plane drivers, and grpc is still used for all rpc calls.
>>>> Code is at my github repo [2]. Besides the framework, I just implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit tests passed, TestCancel hangs, some unit tests run longer than expected, still debugging. The shared memory data plane performance is pretty bad now, due to repeated map/unmap for each read/write, pre-allocated pages should improve much, still experimenting.
>>>>
>>>> Would like to hear community comments.
>>>>
>>>> My personal opinion is the data plane approach reuses grpc control plane, may be easier to add new data acceleration methods, but it needs to fit into grpc seamlessly (there're still gaps not resolved). A new tranport requires much more initial effort, but may payoff later. And looks these two approaches don't conflict with each other.
>>>>
>>>> [1] Test environment
>>>> nics: mellanox connectx5
>>>> hosts: client (neoverse n1), server (xeon gold 5218)
>>>> os: ubuntu 20.04, linux kernel 5.4
>>>> test case: 128k batch size, DoGet
>>>>
>>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
>>>>
>>>> ________________________________
>>>> From: David Li <li...@apache.org>
>>>> Sent: Wednesday, December 29, 2021 3:09 AM
>>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
>>>> Subject: Re: Arrow in HPC
>>>>
>>>> I ended up drafting an implementation of Flight based on UCX, and doing some
>>>> of the necessary refactoring to support additional backends in the future.
>>>> It can run the Flight benchmark, and performance is about comparable to
>>>> gRPC, as tested on AWS EC2.
>>>>
>>>> The implementation is based on the UCP streams API. It's extremely
>>>> bare-bones and is really only a proof of concept; a good amount of work is
>>>> needed to turn it into a usable implementation. I had hoped it would perform
>>>> markedly better than gRPC, at least in this early test, but this seems not
>>>> to be the case. That said: I am likely not using UCX properly, UCX would
>>>> still open up support for additional hardware, and this work should allow
>>>> other backends to be implemented more easily.
>>>>
>>>> The branch can be viewed at
>>>> https://github.com/lidavidm/arrow/tree/flight-ucx
>>>>
>>>> I've attached the benchmark output at the end.
>>>>
>>>> There are still quite a few TODOs and things that need investigating:
>>>>
>>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at that.
>>>> - Concurrent requests are not supported, or even making more than one
>>>>     request on a connection, nor does the server support concurrent clients.
>>>>     We also need to decide whether to even support concurrent requests, and
>>>>     how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style
>>>>     protocol, or even possibly implementing HTTP2).
>>>> - We need to make sure we properly handle errors, etc. everywhere.
>>>> - Are we using UCX in a performant and idiomatic manner? Will the
>>>>     implementation work well on RDMA and other specialized hardware?
>>>> - Do we also need to support the UCX tag API?
>>>> - Can we refactor out interfaces that allow sharing more of the
>>>>     client/server implementation between different backends?
>>>> - Are the abstractions sufficient to support other potential backends like
>>>>     MPI, libfabrics, or WebSockets?
>>>>
>>>> If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
>>>> I'm hoping to plan out and try to tackle some of the TODOs above, and figure
>>>> out how this effort can proceed.
>>>>
>>>> Antoine/Micah raised the possibility of extending gRPC instead. That would
>>>> be preferable, frankly, given otherwise we'd might have to re-implement a
>>>> lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
>>>> proposal stalled and was dropped without much discussion:
>>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
>>>>
>>>> Benchmark results (also uploaded at
>>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
>>>>
>>>> Testing was done between two t3.xlarge instances in the same zone.
>>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
>>>>
>>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096
>>>> Testing method: DoGet
>>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>>> Number of perf runs: 1
>>>> Number of concurrent gets/puts: 1
>>>> Batch size: 131072
>>>> Batches read: 10000
>>>> Bytes read: 1310720000
>>>> Nanos: 2165862969
>>>> Speed: 577.137 MB/s
>>>> Throughput: 4617.1 batches/s
>>>> Latency mean: 214 us
>>>> Latency quantile=0.5: 209 us
>>>> Latency quantile=0.95: 340 us
>>>> Latency quantile=0.99: 409 us
>>>> Latency max: 6350 us
>>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536
>>>> Testing method: DoGet
>>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>>> Number of perf runs: 1
>>>> Number of concurrent gets/puts: 1
>>>> Batch size: 2097152
>>>> Batches read: 10000
>>>> Bytes read: 20971520000
>>>> Nanos: 34184175236
>>>> Speed: 585.066 MB/s
>>>> Throughput: 292.533 batches/s
>>>> Latency mean: 3415 us
>>>> Latency quantile=0.5: 3408 us
>>>> Latency quantile=0.95: 3549 us
>>>> Latency quantile=0.99: 3800 us
>>>> Latency max: 20236 us
>>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096
>>>> Testing method: DoGet
>>>> Using standalone TCP server
>>>> Server host: 172.31.34.4
>>>> Server port: 31337
>>>> Number of perf runs: 1
>>>> Number of concurrent gets/puts: 1
>>>> Batch size: 131072
>>>> Batches read: 10000
>>>> Bytes read: 1310720000
>>>> Nanos: 2375289668
>>>> Speed: 526.252 MB/s
>>>> Throughput: 4210.01 batches/s
>>>> Latency mean: 235 us
>>>> Latency quantile=0.5: 203 us
>>>> Latency quantile=0.95: 328 us
>>>> Latency quantile=0.99: 1377 us
>>>> Latency max: 17860 us
>>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536
>>>> Testing method: DoGet
>>>> Using standalone TCP server
>>>> Server host: 172.31.34.4
>>>> Server port: 31337
>>>> Number of perf runs: 1
>>>> Number of concurrent gets/puts: 1
>>>> Batch size: 2097152
>>>> Batches read: 10000
>>>> Bytes read: 20971520000
>>>> Nanos: 34202704498
>>>> Speed: 584.749 MB/s
>>>> Throughput: 292.375 batches/s
>>>> Latency mean: 3416 us
>>>> Latency quantile=0.5: 3406 us
>>>> Latency quantile=0.95: 3548 us
>>>> Latency quantile=0.99: 3764 us
>>>> Latency max: 17086 us
>>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l 1M
>>>> Connecting to host 172.31.34.4, port 1337
>>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
>>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
>>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
>>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
>>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
>>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
>>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
>>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
>>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
>>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
>>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
>>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
>>>> - - - - - - - - - - - - - - - - - - - - - - - - -
>>>> [ ID] Interval           Transfer     Bitrate         Retr
>>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36             sender
>>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec                  receiver
>>>>
>>>> iperf Done.
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
>>>>> "David Li" <li...@apache.org> writes:
>>>>>
>>>>>> Thanks for the clarification Yibo, looking forward to the results. Even if it is a very hacky PoC it will be interesting to see how it affects performance, though as Keith points out there are benefits in general to UCX (or similar library), and we can work out the implementation plan from there.
>>>>>>
>>>>>> To Benson's point - the work done to get UCX supported would pave the way to supporting other backends as well. I'm personally not familiar with UCX, MPI, etc. so is MPI here more about playing well with established practices or does it also offer potential hardware support/performance improvements like UCX would?
>>>>>
>>>>> There are two main implementations of MPI, MPICH and Open MPI, both of which are permissively licensed open source community projects. Both have direct support for UCX and unless your needs are very specific, the overhead of going through MPI is likely to be negligible. Both also have proprietary derivatives, such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI derivative), which may have optimizations for proprietary networks. Both MPICH and Open MPI can be built without UCX, and this is often easier (UCX 'master' is more volatile in my experience).
>>>>>
>>>>> The vast majority of distributed memory scientific applications use MPI or higher level libraries, rather than writing directly to UCX (which provides less coverage of HPC networks). I think MPI compatibility is important.
>>>>>
>>>>>   From way up-thread (sorry):
>>>>>
>>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As another
>>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC
>>>>>>>>>>>> space.
>>>>>
>>>>> MPI has collective operations like MPI_Allreduce (perform a reduction and give every process the result; these run in log(P) or better time with small constants -- 15 microseconds is typical for a cheap reduction operation on a million processes). MPI supports user-defined operations for reductions and prefix-scan operations. If we defined MPI_Ops for Arrow types, we could compute summary statistics and other algorithmic building blocks fast at arbitrary scale.
>>>>>
>>>>> The collective execution model might not be everyone's bag, but MPI_Op can also be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) and dropping into collective mode has big advantages for certain algorithms in computational statistics/machine learning.
>>>>>
>>>> IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
>>>>
>>>
>>
> 

Re: Arrow in HPC

Posted by Gavin Ray <ra...@gmail.com>.
Congrats!

On Thu, Apr 7, 2022 at 1:35 PM David Li <li...@apache.org> wrote:

> Just as an update: thanks to Yibo for the reviews; we've merged an initial
> implementation that will be available in Arrow 8.0.0 (if built from
> source). There's definitely more work to do:
>
> ARROW-10787 [C++][Flight] DoExchange doesn't support dictionary replacement
> ARROW-15756 [C++][FlightRPC] Benchmark in-process Flight performance
> ARROW-15835 [C++][FlightRPC] Refactor auth, middleware into the
> transport-agnostic layer
> ARROW-15836 [C++][FlightRPC] Refactor remaining methods into
> transport-agnostic handlers
> ARROW-16069 [C++][FlightRPC] Refactor error statuses/codes into the
> transport-agnostic layer
> ARROW-16124 [C++][FlightRPC] UCX server should be able to shed load
> ARROW-16125 [C++][FlightRPC] Implement shutdown with deadline for UCX
> ARROW-16126 [C++][FlightRPC] Pipeline memory allocation/registration
> ARROW-16127 [C++][FlightRPC] Improve concurrent call implementation in UCX
> client
> ARROW-16135 [C++][FlightRPC] Investigate TSAN with gRPC/UCX tests
>
> However it should be usable, and any feedback from intrepid users would be
> very welcome.
>
> On Fri, Mar 18, 2022, at 14:45, David Li wrote:
> > For anyone interested, the PR is finally up and ready:
> > https://github.com/apache/arrow/pull/12442
> >
> > As part of this, Flight in C++ was refactored to allow plugging in
> > alternative transports. There's more work to be done there (auth,
> > middleware, etc. need to be uplifted into the common layer), but this
> > should enable UCX and potentially other network transports.
> >
> > There's still some caveats as described in the PR itself, including
> > some edge cases I need to track down and missing support for a variety
> > of features, but the core data plane methods are supported and the
> > Flight benchmark can be run.
> >
> > Thanks to Yibo Cai, Pavel Shamis, Antoine Pitrou (among others) for
> > assistance and review, and the HPC Advisory Council for granting access
> > to an HPC cluster to help with development and testing.
> >
> > On Tue, Jan 18, 2022, at 18:33, David Li wrote:
> >> Ah, yes, thanks for the reminder. That's one of the things that needs
> >> to be addressed for sure.
> >>
> >> -David
> >>
> >> On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
> >>> One general observation. I think this implementation uses the polling
> to
> >>> check the progress. Because of the client server semantics of Arrow
> Flight,
> >>> you may need to use an interrupt based polling like epoll to avoid the
> busy
> >>> looping.
> >>>
> >>> Best,
> >>> Supun..
> >>>
> >>> On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:
> >>>
> >>> > Thanks for those results, Yibo! Looks like there's still more room
> for
> >>> > improvement here. Yes, things are a little unstable, though I didn't
> >>> > get that much trouble trying to just start the benchmark - I will
> need
> >>> > to find suitable hardware and iron out these issues. Note that I've
> >>> > only implemented DoGet, and I haven't implemented concurrent streams,
> >>> > which would explain why most benchmark configurations hang or error.
> >>> >
> >>> > Since the last time, I've rewritten the prototype to use UCX's
> "active
> >>> > message" functionality instead of trying to implement messages over
> >>> > the "streams" API. This simplified the code. I also did some
> >>> > refactoring along the lines of Yibo's prototype to share more code
> >>> > between the gRPC and UCX implementations. Here are some benchmark
> >>> > numbers:
> >>> >
> >>> > For IPC (server/client on the same machine): UCX with shared memory
> >>> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
> >>> >
> >>> > gRPC:
> >>> > 128KiB batches: 4463 MiB/s
> >>> > 2MiB batches:   3537 MiB/s
> >>> > 32MiB batches:  1828 MiB/s
> >>> >
> >>> > UCX (shared memory):
> >>> > 128KiB batches: 6500 MiB/s
> >>> > 2MiB batches:  13879 MiB/s
> >>> > 32MiB batches:  9045 MiB/s
> >>> >
> >>> > UCX (TCP):
> >>> > 128KiB batches: 1069 MiB/s
> >>> > 2MiB batches:   1735 MiB/s
> >>> > 32MiB batches:  1602 MiB/s
> >>> >
> >>> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
> >>> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
> >>> > bandwidth. This isn't really a scenario where UCX is expected to
> >>> > shine, however, UCX performs comparably to gRPC here.
> >>> >
> >>> > gRPC:
> >>> > 128 KiB batches: 554 MiB/s
> >>> > 2 MiB batches:   575 MiB/s
> >>> >
> >>> > UCX:
> >>> > 128 KiB batches: 546 MiB/s
> >>> > 2 MiB batches:   567 MiB/s
> >>> >
> >>> > Raw test logs can be found here:
> >>> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
> >>> >
> >>> > For IPC, the shared memory results are promising in that it could be
> >>> > feasible to expose a library purely over Flight without worrying
> about
> >>> > FFI bindings. Also, it seems results are roughly comparable to what
> >>> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
> >>> > performant shared memory transport and support for more exotic
> >>> > hardware.
> >>> >
> >>> > There's still much work to be done; at this point, I'd like to start
> >>> > implementing the rest of the Flight methods, fixing up the many TODOs
> >>> > scattered around, trying to refactor more things to share code
> between
> >>> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
> >>> > path for.
> >>> >
> >>> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
> >>> >
> >>> > -David
> >>> >
> >>> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> >>> > > Some updates.
> >>> > >
> >>> > > I tested David's UCX transport patch over 100Gb network. FlightRPC
> over
> >>> > > UCX/RDMA improves throughput about 50%, with lower and flat
> latency.
> >>> > > And I think there are chances to improve further. See test report
> [1].
> >>> > >
> >>> > > For the data plane approach, the PoC shared memory data plane also
> >>> > > introduces significantly performance boost. Details at [2].
> >>> > >
> >>> > > Glad to see there are big potentials to improve FlightRPC
> performance.
> >>> > >
> >>> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
> >>> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
> >>> > >
> >>> > > On 12/30/21 11:57 PM, David Li wrote:
> >>> > > > Ah, I see.
> >>> > > >
> >>> > > > I think both projects can proceed as well. At some point we will
> have
> >>> > to figure out how to merge them, but I think it's too early to see
> how
> >>> > exactly we will want to refactor things.
> >>> > > >
> >>> > > > I looked over the code and I don't have any important comments
> for
> >>> > now. Looking forward to reviewing when it's ready.
> >>> > > >
> >>> > > > -David
> >>> > > >
> >>> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> >>> > > >>
> >>> > > >>
> >>> > > >> On 12/29/21 11:03 PM, David Li wrote:
> >>> > > >>> Awesome, thanks for sharing this too!
> >>> > > >>>
> >>> > > >>> The refactoring you have with DataClientStream what I would
> like to
> >>> > do as well - I think much of the existing code can be adapted to be
> more
> >>> > transport-agnostic and then it will be easier to support new
> transports
> >>> > (whether data-only or for all methods).
> >>> > > >>>
> >>> > > >>> Where do you see the gaps between gRPC and this? I think what
> would
> >>> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://`
> URI 3)
> >>> > client sees the unfamiliar prefix and creates a new client for the
> DoGet
> >>> > call (it would have to do this anyways if, for instance, the
> GetFlightInfo
> >>> > call returned the address of a different server).
> >>> > > >>>
> >>> > > >>
> >>> > > >> I mean implementation details. Some unit test runs longer than
> >>> > expected
> >>> > > >> (data plane timeouts reading from an ended stream). Looks grpc
> stream
> >>> > > >> finish message is not correctly intercepted and forwarded to
> data
> >>> > plane.
> >>> > > >> I don't think it's big problem, just need some time to debug.
> >>> > > >>
> >>> > > >>> I also wonder how this stacks up to UCX's shared memory
> backend (I
> >>> > did not test this though).
> >>> > > >>>
> >>> > > >>
> >>> > > >> I implemented a shared memory data plane only to verify and
> >>> > consolidate
> >>> > > >> the data plane design, as it's the easiest (and useful) driver.
> I also
> >>> > > >> plan to implement a socket based data plane, not useful in
> practice,
> >>> > > >> only to make sure the design works ok across network. Then we
> can add
> >>> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it
> works
> >>> > on
> >>> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
> >>> > equipment).
> >>> > > >>
> >>> > > >>> I would like to be able to support entire new transports for
> certain
> >>> > cases (namely browser support - though perhaps one of the gRPC
> proxies
> >>> > would suffice there), but even in that case, we could make it so
> that a new
> >>> > transport only needs to implement the data plane methods. Only
> having to
> >>> > support the data plane methods would save significant implementation
> effort
> >>> > for all non-browser cases so I think it's a worthwhile approach.
> >>> > > >>>
> >>> > > >>
> >>> > > >> Thanks for being interest in this approach. My current plan is
> to
> >>> > first
> >>> > > >> refactor shared memory data plane to verify it beats grpc in
> local rpc
> >>> > > >> by considerable margin, otherwise there must be big mistakes in
> my
> >>> > > >> design. After that I will fix unit test issues and deliver for
> >>> > community
> >>> > > >> review.
> >>> > > >>
> >>> > > >> Anyway, don't let me block your implementations. And if you
> think it's
> >>> > > >> useful, I can push current code for more detailed discussion.
> >>> > > >>
> >>> > > >>> -David
> >>> > > >>>
> >>> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> >>> > > >>>> Thanks David to initiate UCX integration, great work!
> >>> > > >>>> I think 5Gbps network is too limited for performance
> evaluation. I
> >>> > will try the patch on 100Gb RDMA network, hopefully we can see some
> >>> > improvements.
> >>> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
> >>> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about
> 60us
> >>> > latency. I also benchmarked raw RDMA performance (same batch sizes as
> >>> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
> >>> > comparison is not fair. With David's patch, we can get a more
> realistic
> >>> > comparison.
> >>> > > >>>>
> >>> > > >>>> I'm implementing a data plane approach to hope we can adopt
> new
> >>> > data acceleration methods easily. My approach is to replace only the
> >>> > FlighData transmission of DoGet/Put/Exchange with data plane
> drivers, and
> >>> > grpc is still used for all rpc calls.
> >>> > > >>>> Code is at my github repo [2]. Besides the framework, I just
> >>> > implemented a shared memory data plane driver as PoC.
> Get/Put/Exchange unit
> >>> > tests passed, TestCancel hangs, some unit tests run longer than
> expected,
> >>> > still debugging. The shared memory data plane performance is pretty
> bad
> >>> > now, due to repeated map/unmap for each read/write, pre-allocated
> pages
> >>> > should improve much, still experimenting.
> >>> > > >>>>
> >>> > > >>>> Would like to hear community comments.
> >>> > > >>>>
> >>> > > >>>> My personal opinion is the data plane approach reuses grpc
> control
> >>> > plane, may be easier to add new data acceleration methods, but it
> needs to
> >>> > fit into grpc seamlessly (there're still gaps not resolved). A new
> tranport
> >>> > requires much more initial effort, but may payoff later. And looks
> these
> >>> > two approaches don't conflict with each other.
> >>> > > >>>>
> >>> > > >>>> [1] Test environment
> >>> > > >>>> nics: mellanox connectx5
> >>> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> >>> > > >>>> os: ubuntu 20.04, linux kernel 5.4
> >>> > > >>>> test case: 128k batch size, DoGet
> >>> > > >>>>
> >>> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> >>> > > >>>>
> >>> > > >>>> ________________________________
> >>> > > >>>> From: David Li <li...@apache.org>
> >>> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> >>> > > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
> >>> > > >>>> Subject: Re: Arrow in HPC
> >>> > > >>>>
> >>> > > >>>> I ended up drafting an implementation of Flight based on UCX,
> and
> >>> > doing some
> >>> > > >>>> of the necessary refactoring to support additional backends
> in the
> >>> > future.
> >>> > > >>>> It can run the Flight benchmark, and performance is about
> >>> > comparable to
> >>> > > >>>> gRPC, as tested on AWS EC2.
> >>> > > >>>>
> >>> > > >>>> The implementation is based on the UCP streams API. It's
> extremely
> >>> > > >>>> bare-bones and is really only a proof of concept; a good
> amount of
> >>> > work is
> >>> > > >>>> needed to turn it into a usable implementation. I had hoped it
> >>> > would perform
> >>> > > >>>> markedly better than gRPC, at least in this early test, but
> this
> >>> > seems not
> >>> > > >>>> to be the case. That said: I am likely not using UCX
> properly, UCX
> >>> > would
> >>> > > >>>> still open up support for additional hardware, and this work
> should
> >>> > allow
> >>> > > >>>> other backends to be implemented more easily.
> >>> > > >>>>
> >>> > > >>>> The branch can be viewed at
> >>> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> >>> > > >>>>
> >>> > > >>>> I've attached the benchmark output at the end.
> >>> > > >>>>
> >>> > > >>>> There are still quite a few TODOs and things that need
> >>> > investigating:
> >>> > > >>>>
> >>> > > >>>> - Only DoGet and GetFlightInfo are implemented, and
> incompletely at
> >>> > that.
> >>> > > >>>> - Concurrent requests are not supported, or even making more
> than
> >>> > one
> >>> > > >>>>     request on a connection, nor does the server support
> concurrent
> >>> > clients.
> >>> > > >>>>     We also need to decide whether to even support concurrent
> >>> > requests, and
> >>> > > >>>>     how (e.g. pooling multiple connections, or implementing a
> >>> > gRPC/HTTP2 style
> >>> > > >>>>     protocol, or even possibly implementing HTTP2).
> >>> > > >>>> - We need to make sure we properly handle errors, etc.
> everywhere.
> >>> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will
> the
> >>> > > >>>>     implementation work well on RDMA and other specialized
> hardware?
> >>> > > >>>> - Do we also need to support the UCX tag API?
> >>> > > >>>> - Can we refactor out interfaces that allow sharing more of
> the
> >>> > > >>>>     client/server implementation between different backends?
> >>> > > >>>> - Are the abstractions sufficient to support other potential
> >>> > backends like
> >>> > > >>>>     MPI, libfabrics, or WebSockets?
> >>> > > >>>>
> >>> > > >>>> If anyone has experience with UCX, I'd appreciate any
> feedback.
> >>> > Otherwise,
> >>> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs
> above,
> >>> > and figure
> >>> > > >>>> out how this effort can proceed.
> >>> > > >>>>
> >>> > > >>>> Antoine/Micah raised the possibility of extending gRPC
> instead.
> >>> > That would
> >>> > > >>>> be preferable, frankly, given otherwise we'd might have to
> >>> > re-implement a
> >>> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
> >>> > necessary
> >>> > > >>>> proposal stalled and was dropped without much discussion:
> >>> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> >>> > > >>>>
> >>> > > >>>> Benchmark results (also uploaded at
> >>> > > >>>>
> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> >>> > > >>>>
> >>> > > >>>> Testing was done between two t3.xlarge instances in the same
> zone.
> >>> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> >>> > > >>>>
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=40960000
> >>> > -records_per_batch=4096
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 131072
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 1310720000
> >>> > > >>>> Nanos: 2165862969
> >>> > > >>>> Speed: 577.137 MB/s
> >>> > > >>>> Throughput: 4617.1 batches/s
> >>> > > >>>> Latency mean: 214 us
> >>> > > >>>> Latency quantile=0.5: 209 us
> >>> > > >>>> Latency quantile=0.95: 340 us
> >>> > > >>>> Latency quantile=0.99: 409 us
> >>> > > >>>> Latency max: 6350 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=655360000
> >>> > -records_per_batch=65536
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 2097152
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 20971520000
> >>> > > >>>> Nanos: 34184175236
> >>> > > >>>> Speed: 585.066 MB/s
> >>> > > >>>> Throughput: 292.533 batches/s
> >>> > > >>>> Latency mean: 3415 us
> >>> > > >>>> Latency quantile=0.5: 3408 us
> >>> > > >>>> Latency quantile=0.95: 3549 us
> >>> > > >>>> Latency quantile=0.99: 3800 us
> >>> > > >>>> Latency max: 20236 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=40960000
> >>> > -records_per_batch=4096
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> Using standalone TCP server
> >>> > > >>>> Server host: 172.31.34.4
> >>> > > >>>> Server port: 31337
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 131072
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 1310720000
> >>> > > >>>> Nanos: 2375289668
> >>> > > >>>> Speed: 526.252 MB/s
> >>> > > >>>> Throughput: 4210.01 batches/s
> >>> > > >>>> Latency mean: 235 us
> >>> > > >>>> Latency quantile=0.5: 203 us
> >>> > > >>>> Latency quantile=0.95: 328 us
> >>> > > >>>> Latency quantile=0.99: 1377 us
> >>> > > >>>> Latency max: 17860 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=655360000
> >>> > -records_per_batch=65536
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> Using standalone TCP server
> >>> > > >>>> Server host: 172.31.34.4
> >>> > > >>>> Server port: 31337
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 2097152
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 20971520000
> >>> > > >>>> Nanos: 34202704498
> >>> > > >>>> Speed: 584.749 MB/s
> >>> > > >>>> Throughput: 292.375 batches/s
> >>> > > >>>> Latency mean: 3416 us
> >>> > > >>>> Latency quantile=0.5: 3406 us
> >>> > > >>>> Latency quantile=0.95: 3548 us
> >>> > > >>>> Latency quantile=0.99: 3764 us
> >>> > > >>>> Latency max: 17086 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c
> 172.31.34.4
> >>> > -p 1337 -Z -l 1M
> >>> > > >>>> Connecting to host 172.31.34.4, port 1337
> >>> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4
> port
> >>> > 1337
> >>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> Cwnd
> >>> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36
>  2.35
> >>> > MBytes
> >>> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> >>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> >>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
> >>> >    sender
> >>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
> >>> >   receiver
> >>> > > >>>>
> >>> > > >>>> iperf Done.
> >>> > > >>>>
> >>> > > >>>> Best,
> >>> > > >>>> David
> >>> > > >>>>
> >>> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> >>> > > >>>>> "David Li" <li...@apache.org> writes:
> >>> > > >>>>>
> >>> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
> >>> > results. Even if it is a very hacky PoC it will be interesting to
> see how
> >>> > it affects performance, though as Keith points out there are
> benefits in
> >>> > general to UCX (or similar library), and we can work out the
> implementation
> >>> > plan from there.
> >>> > > >>>>>>
> >>> > > >>>>>> To Benson's point - the work done to get UCX supported
> would pave
> >>> > the way to supporting other backends as well. I'm personally not
> familiar
> >>> > with UCX, MPI, etc. so is MPI here more about playing well with
> established
> >>> > practices or does it also offer potential hardware
> support/performance
> >>> > improvements like UCX would?
> >>> > > >>>>>
> >>> > > >>>>> There are two main implementations of MPI, MPICH and Open
> MPI,
> >>> > both of which are permissively licensed open source community
> projects.
> >>> > Both have direct support for UCX and unless your needs are very
> specific,
> >>> > the overhead of going through MPI is likely to be negligible. Both
> also
> >>> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
> >>> > Spectrum MPI (Open MPI derivative), which may have optimizations for
> >>> > proprietary networks. Both MPICH and Open MPI can be built without
> UCX, and
> >>> > this is often easier (UCX 'master' is more volatile in my
> experience).
> >>> > > >>>>>
> >>> > > >>>>> The vast majority of distributed memory scientific
> applications
> >>> > use MPI or higher level libraries, rather than writing directly to
> UCX
> >>> > (which provides less coverage of HPC networks). I think MPI
> compatibility
> >>> > is important.
> >>> > > >>>>>
> >>> > > >>>>>   From way up-thread (sorry):
> >>> > > >>>>>
> >>> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
> >>> > another
> >>> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not
> familiar with
> >>> > the HPC
> >>> > > >>>>>>>>>>>> space.
> >>> > > >>>>>
> >>> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
> >>> > reduction and give every process the result; these run in log(P) or
> better
> >>> > time with small constants -- 15 microseconds is typical for a cheap
> >>> > reduction operation on a million processes). MPI supports
> user-defined
> >>> > operations for reductions and prefix-scan operations. If we defined
> MPI_Ops
> >>> > for Arrow types, we could compute summary statistics and other
> algorithmic
> >>> > building blocks fast at arbitrary scale.
> >>> > > >>>>>
> >>> > > >>>>> The collective execution model might not be everyone's bag,
> but
> >>> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
> >>> > MPI_Fetch_and_op) and dropping into collective mode has big
> advantages for
> >>> > certain algorithms in computational statistics/machine learning.
> >>> > > >>>>>
> >>> > > >>>> IMPORTANT NOTICE: The contents of this email and any
> attachments
> >>> > are confidential and may also be privileged. If you are not the
> intended
> >>> > recipient, please notify the sender immediately and do not disclose
> the
> >>> > contents to any other person, use it for any purpose, or store or
> copy the
> >>> > information in any medium. Thank you.
> >>> > > >>>>
> >>> > > >>>
> >>> > > >>
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>>
> >>> --
> >>> Supun Kamburugamuve
> >>>
>

Re: Arrow in HPC

Posted by David Li <li...@apache.org>.
Just as an update: thanks to Yibo for the reviews; we've merged an initial implementation that will be available in Arrow 8.0.0 (if built from source). There's definitely more work to do:

ARROW-10787 [C++][Flight] DoExchange doesn't support dictionary replacement
ARROW-15756 [C++][FlightRPC] Benchmark in-process Flight performance
ARROW-15835 [C++][FlightRPC] Refactor auth, middleware into the transport-agnostic layer
ARROW-15836 [C++][FlightRPC] Refactor remaining methods into transport-agnostic handlers
ARROW-16069 [C++][FlightRPC] Refactor error statuses/codes into the transport-agnostic layer
ARROW-16124 [C++][FlightRPC] UCX server should be able to shed load
ARROW-16125 [C++][FlightRPC] Implement shutdown with deadline for UCX
ARROW-16126 [C++][FlightRPC] Pipeline memory allocation/registration
ARROW-16127 [C++][FlightRPC] Improve concurrent call implementation in UCX client
ARROW-16135 [C++][FlightRPC] Investigate TSAN with gRPC/UCX tests

However it should be usable, and any feedback from intrepid users would be very welcome.

On Fri, Mar 18, 2022, at 14:45, David Li wrote:
> For anyone interested, the PR is finally up and ready: 
> https://github.com/apache/arrow/pull/12442
>
> As part of this, Flight in C++ was refactored to allow plugging in 
> alternative transports. There's more work to be done there (auth, 
> middleware, etc. need to be uplifted into the common layer), but this 
> should enable UCX and potentially other network transports.
>
> There's still some caveats as described in the PR itself, including 
> some edge cases I need to track down and missing support for a variety 
> of features, but the core data plane methods are supported and the 
> Flight benchmark can be run.
>
> Thanks to Yibo Cai, Pavel Shamis, Antoine Pitrou (among others) for 
> assistance and review, and the HPC Advisory Council for granting access 
> to an HPC cluster to help with development and testing.
>
> On Tue, Jan 18, 2022, at 18:33, David Li wrote:
>> Ah, yes, thanks for the reminder. That's one of the things that needs 
>> to be addressed for sure.
>>
>> -David
>>
>> On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
>>> One general observation. I think this implementation uses the polling to
>>> check the progress. Because of the client server semantics of Arrow Flight,
>>> you may need to use an interrupt based polling like epoll to avoid the busy
>>> looping.
>>> 
>>> Best,
>>> Supun..
>>> 
>>> On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:
>>> 
>>> > Thanks for those results, Yibo! Looks like there's still more room for
>>> > improvement here. Yes, things are a little unstable, though I didn't
>>> > get that much trouble trying to just start the benchmark - I will need
>>> > to find suitable hardware and iron out these issues. Note that I've
>>> > only implemented DoGet, and I haven't implemented concurrent streams,
>>> > which would explain why most benchmark configurations hang or error.
>>> >
>>> > Since the last time, I've rewritten the prototype to use UCX's "active
>>> > message" functionality instead of trying to implement messages over
>>> > the "streams" API. This simplified the code. I also did some
>>> > refactoring along the lines of Yibo's prototype to share more code
>>> > between the gRPC and UCX implementations. Here are some benchmark
>>> > numbers:
>>> >
>>> > For IPC (server/client on the same machine): UCX with shared memory
>>> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
>>> >
>>> > gRPC:
>>> > 128KiB batches: 4463 MiB/s
>>> > 2MiB batches:   3537 MiB/s
>>> > 32MiB batches:  1828 MiB/s
>>> >
>>> > UCX (shared memory):
>>> > 128KiB batches: 6500 MiB/s
>>> > 2MiB batches:  13879 MiB/s
>>> > 32MiB batches:  9045 MiB/s
>>> >
>>> > UCX (TCP):
>>> > 128KiB batches: 1069 MiB/s
>>> > 2MiB batches:   1735 MiB/s
>>> > 32MiB batches:  1602 MiB/s
>>> >
>>> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
>>> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
>>> > bandwidth. This isn't really a scenario where UCX is expected to
>>> > shine, however, UCX performs comparably to gRPC here.
>>> >
>>> > gRPC:
>>> > 128 KiB batches: 554 MiB/s
>>> > 2 MiB batches:   575 MiB/s
>>> >
>>> > UCX:
>>> > 128 KiB batches: 546 MiB/s
>>> > 2 MiB batches:   567 MiB/s
>>> >
>>> > Raw test logs can be found here:
>>> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
>>> >
>>> > For IPC, the shared memory results are promising in that it could be
>>> > feasible to expose a library purely over Flight without worrying about
>>> > FFI bindings. Also, it seems results are roughly comparable to what
>>> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
>>> > performant shared memory transport and support for more exotic
>>> > hardware.
>>> >
>>> > There's still much work to be done; at this point, I'd like to start
>>> > implementing the rest of the Flight methods, fixing up the many TODOs
>>> > scattered around, trying to refactor more things to share code between
>>> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
>>> > path for.
>>> >
>>> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
>>> >
>>> > -David
>>> >
>>> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
>>> > > Some updates.
>>> > >
>>> > > I tested David's UCX transport patch over 100Gb network. FlightRPC over
>>> > > UCX/RDMA improves throughput about 50%, with lower and flat latency.
>>> > > And I think there are chances to improve further. See test report [1].
>>> > >
>>> > > For the data plane approach, the PoC shared memory data plane also
>>> > > introduces significantly performance boost. Details at [2].
>>> > >
>>> > > Glad to see there are big potentials to improve FlightRPC performance.
>>> > >
>>> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
>>> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
>>> > >
>>> > > On 12/30/21 11:57 PM, David Li wrote:
>>> > > > Ah, I see.
>>> > > >
>>> > > > I think both projects can proceed as well. At some point we will have
>>> > to figure out how to merge them, but I think it's too early to see how
>>> > exactly we will want to refactor things.
>>> > > >
>>> > > > I looked over the code and I don't have any important comments for
>>> > now. Looking forward to reviewing when it's ready.
>>> > > >
>>> > > > -David
>>> > > >
>>> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
>>> > > >>
>>> > > >>
>>> > > >> On 12/29/21 11:03 PM, David Li wrote:
>>> > > >>> Awesome, thanks for sharing this too!
>>> > > >>>
>>> > > >>> The refactoring you have with DataClientStream what I would like to
>>> > do as well - I think much of the existing code can be adapted to be more
>>> > transport-agnostic and then it will be easier to support new transports
>>> > (whether data-only or for all methods).
>>> > > >>>
>>> > > >>> Where do you see the gaps between gRPC and this? I think what would
>>> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
>>> > client sees the unfamiliar prefix and creates a new client for the DoGet
>>> > call (it would have to do this anyways if, for instance, the GetFlightInfo
>>> > call returned the address of a different server).
>>> > > >>>
>>> > > >>
>>> > > >> I mean implementation details. Some unit test runs longer than
>>> > expected
>>> > > >> (data plane timeouts reading from an ended stream). Looks grpc stream
>>> > > >> finish message is not correctly intercepted and forwarded to data
>>> > plane.
>>> > > >> I don't think it's big problem, just need some time to debug.
>>> > > >>
>>> > > >>> I also wonder how this stacks up to UCX's shared memory backend (I
>>> > did not test this though).
>>> > > >>>
>>> > > >>
>>> > > >> I implemented a shared memory data plane only to verify and
>>> > consolidate
>>> > > >> the data plane design, as it's the easiest (and useful) driver. I also
>>> > > >> plan to implement a socket based data plane, not useful in practice,
>>> > > >> only to make sure the design works ok across network. Then we can add
>>> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
>>> > on
>>> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
>>> > equipment).
>>> > > >>
>>> > > >>> I would like to be able to support entire new transports for certain
>>> > cases (namely browser support - though perhaps one of the gRPC proxies
>>> > would suffice there), but even in that case, we could make it so that a new
>>> > transport only needs to implement the data plane methods. Only having to
>>> > support the data plane methods would save significant implementation effort
>>> > for all non-browser cases so I think it's a worthwhile approach.
>>> > > >>>
>>> > > >>
>>> > > >> Thanks for being interest in this approach. My current plan is to
>>> > first
>>> > > >> refactor shared memory data plane to verify it beats grpc in local rpc
>>> > > >> by considerable margin, otherwise there must be big mistakes in my
>>> > > >> design. After that I will fix unit test issues and deliver for
>>> > community
>>> > > >> review.
>>> > > >>
>>> > > >> Anyway, don't let me block your implementations. And if you think it's
>>> > > >> useful, I can push current code for more detailed discussion.
>>> > > >>
>>> > > >>> -David
>>> > > >>>
>>> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
>>> > > >>>> Thanks David to initiate UCX integration, great work!
>>> > > >>>> I think 5Gbps network is too limited for performance evaluation. I
>>> > will try the patch on 100Gb RDMA network, hopefully we can see some
>>> > improvements.
>>> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
>>> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
>>> > latency. I also benchmarked raw RDMA performance (same batch sizes as
>>> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
>>> > comparison is not fair. With David's patch, we can get a more realistic
>>> > comparison.
>>> > > >>>>
>>> > > >>>> I'm implementing a data plane approach to hope we can adopt new
>>> > data acceleration methods easily. My approach is to replace only the
>>> > FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
>>> > grpc is still used for all rpc calls.
>>> > > >>>> Code is at my github repo [2]. Besides the framework, I just
>>> > implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
>>> > tests passed, TestCancel hangs, some unit tests run longer than expected,
>>> > still debugging. The shared memory data plane performance is pretty bad
>>> > now, due to repeated map/unmap for each read/write, pre-allocated pages
>>> > should improve much, still experimenting.
>>> > > >>>>
>>> > > >>>> Would like to hear community comments.
>>> > > >>>>
>>> > > >>>> My personal opinion is the data plane approach reuses grpc control
>>> > plane, may be easier to add new data acceleration methods, but it needs to
>>> > fit into grpc seamlessly (there're still gaps not resolved). A new tranport
>>> > requires much more initial effort, but may payoff later. And looks these
>>> > two approaches don't conflict with each other.
>>> > > >>>>
>>> > > >>>> [1] Test environment
>>> > > >>>> nics: mellanox connectx5
>>> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
>>> > > >>>> os: ubuntu 20.04, linux kernel 5.4
>>> > > >>>> test case: 128k batch size, DoGet
>>> > > >>>>
>>> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
>>> > > >>>>
>>> > > >>>> ________________________________
>>> > > >>>> From: David Li <li...@apache.org>
>>> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
>>> > > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
>>> > > >>>> Subject: Re: Arrow in HPC
>>> > > >>>>
>>> > > >>>> I ended up drafting an implementation of Flight based on UCX, and
>>> > doing some
>>> > > >>>> of the necessary refactoring to support additional backends in the
>>> > future.
>>> > > >>>> It can run the Flight benchmark, and performance is about
>>> > comparable to
>>> > > >>>> gRPC, as tested on AWS EC2.
>>> > > >>>>
>>> > > >>>> The implementation is based on the UCP streams API. It's extremely
>>> > > >>>> bare-bones and is really only a proof of concept; a good amount of
>>> > work is
>>> > > >>>> needed to turn it into a usable implementation. I had hoped it
>>> > would perform
>>> > > >>>> markedly better than gRPC, at least in this early test, but this
>>> > seems not
>>> > > >>>> to be the case. That said: I am likely not using UCX properly, UCX
>>> > would
>>> > > >>>> still open up support for additional hardware, and this work should
>>> > allow
>>> > > >>>> other backends to be implemented more easily.
>>> > > >>>>
>>> > > >>>> The branch can be viewed at
>>> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
>>> > > >>>>
>>> > > >>>> I've attached the benchmark output at the end.
>>> > > >>>>
>>> > > >>>> There are still quite a few TODOs and things that need
>>> > investigating:
>>> > > >>>>
>>> > > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
>>> > that.
>>> > > >>>> - Concurrent requests are not supported, or even making more than
>>> > one
>>> > > >>>>     request on a connection, nor does the server support concurrent
>>> > clients.
>>> > > >>>>     We also need to decide whether to even support concurrent
>>> > requests, and
>>> > > >>>>     how (e.g. pooling multiple connections, or implementing a
>>> > gRPC/HTTP2 style
>>> > > >>>>     protocol, or even possibly implementing HTTP2).
>>> > > >>>> - We need to make sure we properly handle errors, etc. everywhere.
>>> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
>>> > > >>>>     implementation work well on RDMA and other specialized hardware?
>>> > > >>>> - Do we also need to support the UCX tag API?
>>> > > >>>> - Can we refactor out interfaces that allow sharing more of the
>>> > > >>>>     client/server implementation between different backends?
>>> > > >>>> - Are the abstractions sufficient to support other potential
>>> > backends like
>>> > > >>>>     MPI, libfabrics, or WebSockets?
>>> > > >>>>
>>> > > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
>>> > Otherwise,
>>> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
>>> > and figure
>>> > > >>>> out how this effort can proceed.
>>> > > >>>>
>>> > > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
>>> > That would
>>> > > >>>> be preferable, frankly, given otherwise we'd might have to
>>> > re-implement a
>>> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
>>> > necessary
>>> > > >>>> proposal stalled and was dropped without much discussion:
>>> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
>>> > > >>>>
>>> > > >>>> Benchmark results (also uploaded at
>>> > > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
>>> > > >>>>
>>> > > >>>> Testing was done between two t3.xlarge instances in the same zone.
>>> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
>>> > > >>>>
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
>>> > -records_per_batch=4096
>>> > > >>>> Testing method: DoGet
>>> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 131072
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 1310720000
>>> > > >>>> Nanos: 2165862969
>>> > > >>>> Speed: 577.137 MB/s
>>> > > >>>> Throughput: 4617.1 batches/s
>>> > > >>>> Latency mean: 214 us
>>> > > >>>> Latency quantile=0.5: 209 us
>>> > > >>>> Latency quantile=0.95: 340 us
>>> > > >>>> Latency quantile=0.99: 409 us
>>> > > >>>> Latency max: 6350 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
>>> > -records_per_batch=65536
>>> > > >>>> Testing method: DoGet
>>> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 2097152
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 20971520000
>>> > > >>>> Nanos: 34184175236
>>> > > >>>> Speed: 585.066 MB/s
>>> > > >>>> Throughput: 292.533 batches/s
>>> > > >>>> Latency mean: 3415 us
>>> > > >>>> Latency quantile=0.5: 3408 us
>>> > > >>>> Latency quantile=0.95: 3549 us
>>> > > >>>> Latency quantile=0.99: 3800 us
>>> > > >>>> Latency max: 20236 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
>>> > -records_per_batch=4096
>>> > > >>>> Testing method: DoGet
>>> > > >>>> Using standalone TCP server
>>> > > >>>> Server host: 172.31.34.4
>>> > > >>>> Server port: 31337
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 131072
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 1310720000
>>> > > >>>> Nanos: 2375289668
>>> > > >>>> Speed: 526.252 MB/s
>>> > > >>>> Throughput: 4210.01 batches/s
>>> > > >>>> Latency mean: 235 us
>>> > > >>>> Latency quantile=0.5: 203 us
>>> > > >>>> Latency quantile=0.95: 328 us
>>> > > >>>> Latency quantile=0.99: 1377 us
>>> > > >>>> Latency max: 17860 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
>>> > -records_per_batch=65536
>>> > > >>>> Testing method: DoGet
>>> > > >>>> Using standalone TCP server
>>> > > >>>> Server host: 172.31.34.4
>>> > > >>>> Server port: 31337
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 2097152
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 20971520000
>>> > > >>>> Nanos: 34202704498
>>> > > >>>> Speed: 584.749 MB/s
>>> > > >>>> Throughput: 292.375 batches/s
>>> > > >>>> Latency mean: 3416 us
>>> > > >>>> Latency quantile=0.5: 3406 us
>>> > > >>>> Latency quantile=0.95: 3548 us
>>> > > >>>> Latency quantile=0.99: 3764 us
>>> > > >>>> Latency max: 17086 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
>>> > -p 1337 -Z -l 1M
>>> > > >>>> Connecting to host 172.31.34.4, port 1337
>>> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
>>> > 1337
>>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
>>> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
>>> > MBytes
>>> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
>>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
>>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
>>> >    sender
>>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
>>> >   receiver
>>> > > >>>>
>>> > > >>>> iperf Done.
>>> > > >>>>
>>> > > >>>> Best,
>>> > > >>>> David
>>> > > >>>>
>>> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
>>> > > >>>>> "David Li" <li...@apache.org> writes:
>>> > > >>>>>
>>> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
>>> > results. Even if it is a very hacky PoC it will be interesting to see how
>>> > it affects performance, though as Keith points out there are benefits in
>>> > general to UCX (or similar library), and we can work out the implementation
>>> > plan from there.
>>> > > >>>>>>
>>> > > >>>>>> To Benson's point - the work done to get UCX supported would pave
>>> > the way to supporting other backends as well. I'm personally not familiar
>>> > with UCX, MPI, etc. so is MPI here more about playing well with established
>>> > practices or does it also offer potential hardware support/performance
>>> > improvements like UCX would?
>>> > > >>>>>
>>> > > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
>>> > both of which are permissively licensed open source community projects.
>>> > Both have direct support for UCX and unless your needs are very specific,
>>> > the overhead of going through MPI is likely to be negligible. Both also
>>> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
>>> > Spectrum MPI (Open MPI derivative), which may have optimizations for
>>> > proprietary networks. Both MPICH and Open MPI can be built without UCX, and
>>> > this is often easier (UCX 'master' is more volatile in my experience).
>>> > > >>>>>
>>> > > >>>>> The vast majority of distributed memory scientific applications
>>> > use MPI or higher level libraries, rather than writing directly to UCX
>>> > (which provides less coverage of HPC networks). I think MPI compatibility
>>> > is important.
>>> > > >>>>>
>>> > > >>>>>   From way up-thread (sorry):
>>> > > >>>>>
>>> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
>>> > another
>>> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
>>> > the HPC
>>> > > >>>>>>>>>>>> space.
>>> > > >>>>>
>>> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
>>> > reduction and give every process the result; these run in log(P) or better
>>> > time with small constants -- 15 microseconds is typical for a cheap
>>> > reduction operation on a million processes). MPI supports user-defined
>>> > operations for reductions and prefix-scan operations. If we defined MPI_Ops
>>> > for Arrow types, we could compute summary statistics and other algorithmic
>>> > building blocks fast at arbitrary scale.
>>> > > >>>>>
>>> > > >>>>> The collective execution model might not be everyone's bag, but
>>> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
>>> > MPI_Fetch_and_op) and dropping into collective mode has big advantages for
>>> > certain algorithms in computational statistics/machine learning.
>>> > > >>>>>
>>> > > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
>>> > are confidential and may also be privileged. If you are not the intended
>>> > recipient, please notify the sender immediately and do not disclose the
>>> > contents to any other person, use it for any purpose, or store or copy the
>>> > information in any medium. Thank you.
>>> > > >>>>
>>> > > >>>
>>> > > >>
>>> > > >
>>> > >
>>> >
>>> 
>>> 
>>> -- 
>>> Supun Kamburugamuve
>>>

Re: Arrow in HPC

Posted by David Li <li...@apache.org>.
For anyone interested, the PR is finally up and ready: https://github.com/apache/arrow/pull/12442

As part of this, Flight in C++ was refactored to allow plugging in alternative transports. There's more work to be done there (auth, middleware, etc. need to be uplifted into the common layer), but this should enable UCX and potentially other network transports.

There's still some caveats as described in the PR itself, including some edge cases I need to track down and missing support for a variety of features, but the core data plane methods are supported and the Flight benchmark can be run.

Thanks to Yibo Cai, Pavel Shamis, Antoine Pitrou (among others) for assistance and review, and the HPC Advisory Council for granting access to an HPC cluster to help with development and testing.

On Tue, Jan 18, 2022, at 18:33, David Li wrote:
> Ah, yes, thanks for the reminder. That's one of the things that needs 
> to be addressed for sure.
>
> -David
>
> On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
>> One general observation. I think this implementation uses the polling to
>> check the progress. Because of the client server semantics of Arrow Flight,
>> you may need to use an interrupt based polling like epoll to avoid the busy
>> looping.
>> 
>> Best,
>> Supun..
>> 
>> On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:
>> 
>> > Thanks for those results, Yibo! Looks like there's still more room for
>> > improvement here. Yes, things are a little unstable, though I didn't
>> > get that much trouble trying to just start the benchmark - I will need
>> > to find suitable hardware and iron out these issues. Note that I've
>> > only implemented DoGet, and I haven't implemented concurrent streams,
>> > which would explain why most benchmark configurations hang or error.
>> >
>> > Since the last time, I've rewritten the prototype to use UCX's "active
>> > message" functionality instead of trying to implement messages over
>> > the "streams" API. This simplified the code. I also did some
>> > refactoring along the lines of Yibo's prototype to share more code
>> > between the gRPC and UCX implementations. Here are some benchmark
>> > numbers:
>> >
>> > For IPC (server/client on the same machine): UCX with shared memory
>> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
>> >
>> > gRPC:
>> > 128KiB batches: 4463 MiB/s
>> > 2MiB batches:   3537 MiB/s
>> > 32MiB batches:  1828 MiB/s
>> >
>> > UCX (shared memory):
>> > 128KiB batches: 6500 MiB/s
>> > 2MiB batches:  13879 MiB/s
>> > 32MiB batches:  9045 MiB/s
>> >
>> > UCX (TCP):
>> > 128KiB batches: 1069 MiB/s
>> > 2MiB batches:   1735 MiB/s
>> > 32MiB batches:  1602 MiB/s
>> >
>> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
>> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
>> > bandwidth. This isn't really a scenario where UCX is expected to
>> > shine, however, UCX performs comparably to gRPC here.
>> >
>> > gRPC:
>> > 128 KiB batches: 554 MiB/s
>> > 2 MiB batches:   575 MiB/s
>> >
>> > UCX:
>> > 128 KiB batches: 546 MiB/s
>> > 2 MiB batches:   567 MiB/s
>> >
>> > Raw test logs can be found here:
>> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
>> >
>> > For IPC, the shared memory results are promising in that it could be
>> > feasible to expose a library purely over Flight without worrying about
>> > FFI bindings. Also, it seems results are roughly comparable to what
>> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
>> > performant shared memory transport and support for more exotic
>> > hardware.
>> >
>> > There's still much work to be done; at this point, I'd like to start
>> > implementing the rest of the Flight methods, fixing up the many TODOs
>> > scattered around, trying to refactor more things to share code between
>> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
>> > path for.
>> >
>> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
>> >
>> > -David
>> >
>> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
>> > > Some updates.
>> > >
>> > > I tested David's UCX transport patch over 100Gb network. FlightRPC over
>> > > UCX/RDMA improves throughput about 50%, with lower and flat latency.
>> > > And I think there are chances to improve further. See test report [1].
>> > >
>> > > For the data plane approach, the PoC shared memory data plane also
>> > > introduces significantly performance boost. Details at [2].
>> > >
>> > > Glad to see there are big potentials to improve FlightRPC performance.
>> > >
>> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
>> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
>> > >
>> > > On 12/30/21 11:57 PM, David Li wrote:
>> > > > Ah, I see.
>> > > >
>> > > > I think both projects can proceed as well. At some point we will have
>> > to figure out how to merge them, but I think it's too early to see how
>> > exactly we will want to refactor things.
>> > > >
>> > > > I looked over the code and I don't have any important comments for
>> > now. Looking forward to reviewing when it's ready.
>> > > >
>> > > > -David
>> > > >
>> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
>> > > >>
>> > > >>
>> > > >> On 12/29/21 11:03 PM, David Li wrote:
>> > > >>> Awesome, thanks for sharing this too!
>> > > >>>
>> > > >>> The refactoring you have with DataClientStream what I would like to
>> > do as well - I think much of the existing code can be adapted to be more
>> > transport-agnostic and then it will be easier to support new transports
>> > (whether data-only or for all methods).
>> > > >>>
>> > > >>> Where do you see the gaps between gRPC and this? I think what would
>> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
>> > client sees the unfamiliar prefix and creates a new client for the DoGet
>> > call (it would have to do this anyways if, for instance, the GetFlightInfo
>> > call returned the address of a different server).
>> > > >>>
>> > > >>
>> > > >> I mean implementation details. Some unit test runs longer than
>> > expected
>> > > >> (data plane timeouts reading from an ended stream). Looks grpc stream
>> > > >> finish message is not correctly intercepted and forwarded to data
>> > plane.
>> > > >> I don't think it's big problem, just need some time to debug.
>> > > >>
>> > > >>> I also wonder how this stacks up to UCX's shared memory backend (I
>> > did not test this though).
>> > > >>>
>> > > >>
>> > > >> I implemented a shared memory data plane only to verify and
>> > consolidate
>> > > >> the data plane design, as it's the easiest (and useful) driver. I also
>> > > >> plan to implement a socket based data plane, not useful in practice,
>> > > >> only to make sure the design works ok across network. Then we can add
>> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
>> > on
>> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
>> > equipment).
>> > > >>
>> > > >>> I would like to be able to support entire new transports for certain
>> > cases (namely browser support - though perhaps one of the gRPC proxies
>> > would suffice there), but even in that case, we could make it so that a new
>> > transport only needs to implement the data plane methods. Only having to
>> > support the data plane methods would save significant implementation effort
>> > for all non-browser cases so I think it's a worthwhile approach.
>> > > >>>
>> > > >>
>> > > >> Thanks for being interest in this approach. My current plan is to
>> > first
>> > > >> refactor shared memory data plane to verify it beats grpc in local rpc
>> > > >> by considerable margin, otherwise there must be big mistakes in my
>> > > >> design. After that I will fix unit test issues and deliver for
>> > community
>> > > >> review.
>> > > >>
>> > > >> Anyway, don't let me block your implementations. And if you think it's
>> > > >> useful, I can push current code for more detailed discussion.
>> > > >>
>> > > >>> -David
>> > > >>>
>> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
>> > > >>>> Thanks David to initiate UCX integration, great work!
>> > > >>>> I think 5Gbps network is too limited for performance evaluation. I
>> > will try the patch on 100Gb RDMA network, hopefully we can see some
>> > improvements.
>> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
>> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
>> > latency. I also benchmarked raw RDMA performance (same batch sizes as
>> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
>> > comparison is not fair. With David's patch, we can get a more realistic
>> > comparison.
>> > > >>>>
>> > > >>>> I'm implementing a data plane approach to hope we can adopt new
>> > data acceleration methods easily. My approach is to replace only the
>> > FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
>> > grpc is still used for all rpc calls.
>> > > >>>> Code is at my github repo [2]. Besides the framework, I just
>> > implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
>> > tests passed, TestCancel hangs, some unit tests run longer than expected,
>> > still debugging. The shared memory data plane performance is pretty bad
>> > now, due to repeated map/unmap for each read/write, pre-allocated pages
>> > should improve much, still experimenting.
>> > > >>>>
>> > > >>>> Would like to hear community comments.
>> > > >>>>
>> > > >>>> My personal opinion is the data plane approach reuses grpc control
>> > plane, may be easier to add new data acceleration methods, but it needs to
>> > fit into grpc seamlessly (there're still gaps not resolved). A new tranport
>> > requires much more initial effort, but may payoff later. And looks these
>> > two approaches don't conflict with each other.
>> > > >>>>
>> > > >>>> [1] Test environment
>> > > >>>> nics: mellanox connectx5
>> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
>> > > >>>> os: ubuntu 20.04, linux kernel 5.4
>> > > >>>> test case: 128k batch size, DoGet
>> > > >>>>
>> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
>> > > >>>>
>> > > >>>> ________________________________
>> > > >>>> From: David Li <li...@apache.org>
>> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
>> > > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
>> > > >>>> Subject: Re: Arrow in HPC
>> > > >>>>
>> > > >>>> I ended up drafting an implementation of Flight based on UCX, and
>> > doing some
>> > > >>>> of the necessary refactoring to support additional backends in the
>> > future.
>> > > >>>> It can run the Flight benchmark, and performance is about
>> > comparable to
>> > > >>>> gRPC, as tested on AWS EC2.
>> > > >>>>
>> > > >>>> The implementation is based on the UCP streams API. It's extremely
>> > > >>>> bare-bones and is really only a proof of concept; a good amount of
>> > work is
>> > > >>>> needed to turn it into a usable implementation. I had hoped it
>> > would perform
>> > > >>>> markedly better than gRPC, at least in this early test, but this
>> > seems not
>> > > >>>> to be the case. That said: I am likely not using UCX properly, UCX
>> > would
>> > > >>>> still open up support for additional hardware, and this work should
>> > allow
>> > > >>>> other backends to be implemented more easily.
>> > > >>>>
>> > > >>>> The branch can be viewed at
>> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
>> > > >>>>
>> > > >>>> I've attached the benchmark output at the end.
>> > > >>>>
>> > > >>>> There are still quite a few TODOs and things that need
>> > investigating:
>> > > >>>>
>> > > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
>> > that.
>> > > >>>> - Concurrent requests are not supported, or even making more than
>> > one
>> > > >>>>     request on a connection, nor does the server support concurrent
>> > clients.
>> > > >>>>     We also need to decide whether to even support concurrent
>> > requests, and
>> > > >>>>     how (e.g. pooling multiple connections, or implementing a
>> > gRPC/HTTP2 style
>> > > >>>>     protocol, or even possibly implementing HTTP2).
>> > > >>>> - We need to make sure we properly handle errors, etc. everywhere.
>> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
>> > > >>>>     implementation work well on RDMA and other specialized hardware?
>> > > >>>> - Do we also need to support the UCX tag API?
>> > > >>>> - Can we refactor out interfaces that allow sharing more of the
>> > > >>>>     client/server implementation between different backends?
>> > > >>>> - Are the abstractions sufficient to support other potential
>> > backends like
>> > > >>>>     MPI, libfabrics, or WebSockets?
>> > > >>>>
>> > > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
>> > Otherwise,
>> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
>> > and figure
>> > > >>>> out how this effort can proceed.
>> > > >>>>
>> > > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
>> > That would
>> > > >>>> be preferable, frankly, given otherwise we'd might have to
>> > re-implement a
>> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
>> > necessary
>> > > >>>> proposal stalled and was dropped without much discussion:
>> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
>> > > >>>>
>> > > >>>> Benchmark results (also uploaded at
>> > > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
>> > > >>>>
>> > > >>>> Testing was done between two t3.xlarge instances in the same zone.
>> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
>> > > >>>>
>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
>> > -records_per_batch=4096
>> > > >>>> Testing method: DoGet
>> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>> > > >>>> Number of perf runs: 1
>> > > >>>> Number of concurrent gets/puts: 1
>> > > >>>> Batch size: 131072
>> > > >>>> Batches read: 10000
>> > > >>>> Bytes read: 1310720000
>> > > >>>> Nanos: 2165862969
>> > > >>>> Speed: 577.137 MB/s
>> > > >>>> Throughput: 4617.1 batches/s
>> > > >>>> Latency mean: 214 us
>> > > >>>> Latency quantile=0.5: 209 us
>> > > >>>> Latency quantile=0.95: 340 us
>> > > >>>> Latency quantile=0.99: 409 us
>> > > >>>> Latency max: 6350 us
>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
>> > -records_per_batch=65536
>> > > >>>> Testing method: DoGet
>> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>> > > >>>> Number of perf runs: 1
>> > > >>>> Number of concurrent gets/puts: 1
>> > > >>>> Batch size: 2097152
>> > > >>>> Batches read: 10000
>> > > >>>> Bytes read: 20971520000
>> > > >>>> Nanos: 34184175236
>> > > >>>> Speed: 585.066 MB/s
>> > > >>>> Throughput: 292.533 batches/s
>> > > >>>> Latency mean: 3415 us
>> > > >>>> Latency quantile=0.5: 3408 us
>> > > >>>> Latency quantile=0.95: 3549 us
>> > > >>>> Latency quantile=0.99: 3800 us
>> > > >>>> Latency max: 20236 us
>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
>> > -records_per_batch=4096
>> > > >>>> Testing method: DoGet
>> > > >>>> Using standalone TCP server
>> > > >>>> Server host: 172.31.34.4
>> > > >>>> Server port: 31337
>> > > >>>> Number of perf runs: 1
>> > > >>>> Number of concurrent gets/puts: 1
>> > > >>>> Batch size: 131072
>> > > >>>> Batches read: 10000
>> > > >>>> Bytes read: 1310720000
>> > > >>>> Nanos: 2375289668
>> > > >>>> Speed: 526.252 MB/s
>> > > >>>> Throughput: 4210.01 batches/s
>> > > >>>> Latency mean: 235 us
>> > > >>>> Latency quantile=0.5: 203 us
>> > > >>>> Latency quantile=0.95: 328 us
>> > > >>>> Latency quantile=0.99: 1377 us
>> > > >>>> Latency max: 17860 us
>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
>> > -records_per_batch=65536
>> > > >>>> Testing method: DoGet
>> > > >>>> Using standalone TCP server
>> > > >>>> Server host: 172.31.34.4
>> > > >>>> Server port: 31337
>> > > >>>> Number of perf runs: 1
>> > > >>>> Number of concurrent gets/puts: 1
>> > > >>>> Batch size: 2097152
>> > > >>>> Batches read: 10000
>> > > >>>> Bytes read: 20971520000
>> > > >>>> Nanos: 34202704498
>> > > >>>> Speed: 584.749 MB/s
>> > > >>>> Throughput: 292.375 batches/s
>> > > >>>> Latency mean: 3416 us
>> > > >>>> Latency quantile=0.5: 3406 us
>> > > >>>> Latency quantile=0.95: 3548 us
>> > > >>>> Latency quantile=0.99: 3764 us
>> > > >>>> Latency max: 17086 us
>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
>> > -p 1337 -Z -l 1M
>> > > >>>> Connecting to host 172.31.34.4, port 1337
>> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
>> > 1337
>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
>> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
>> > MBytes
>> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
>> > MBytes
>> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
>> >    sender
>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
>> >   receiver
>> > > >>>>
>> > > >>>> iperf Done.
>> > > >>>>
>> > > >>>> Best,
>> > > >>>> David
>> > > >>>>
>> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
>> > > >>>>> "David Li" <li...@apache.org> writes:
>> > > >>>>>
>> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
>> > results. Even if it is a very hacky PoC it will be interesting to see how
>> > it affects performance, though as Keith points out there are benefits in
>> > general to UCX (or similar library), and we can work out the implementation
>> > plan from there.
>> > > >>>>>>
>> > > >>>>>> To Benson's point - the work done to get UCX supported would pave
>> > the way to supporting other backends as well. I'm personally not familiar
>> > with UCX, MPI, etc. so is MPI here more about playing well with established
>> > practices or does it also offer potential hardware support/performance
>> > improvements like UCX would?
>> > > >>>>>
>> > > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
>> > both of which are permissively licensed open source community projects.
>> > Both have direct support for UCX and unless your needs are very specific,
>> > the overhead of going through MPI is likely to be negligible. Both also
>> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
>> > Spectrum MPI (Open MPI derivative), which may have optimizations for
>> > proprietary networks. Both MPICH and Open MPI can be built without UCX, and
>> > this is often easier (UCX 'master' is more volatile in my experience).
>> > > >>>>>
>> > > >>>>> The vast majority of distributed memory scientific applications
>> > use MPI or higher level libraries, rather than writing directly to UCX
>> > (which provides less coverage of HPC networks). I think MPI compatibility
>> > is important.
>> > > >>>>>
>> > > >>>>>   From way up-thread (sorry):
>> > > >>>>>
>> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
>> > another
>> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
>> > the HPC
>> > > >>>>>>>>>>>> space.
>> > > >>>>>
>> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
>> > reduction and give every process the result; these run in log(P) or better
>> > time with small constants -- 15 microseconds is typical for a cheap
>> > reduction operation on a million processes). MPI supports user-defined
>> > operations for reductions and prefix-scan operations. If we defined MPI_Ops
>> > for Arrow types, we could compute summary statistics and other algorithmic
>> > building blocks fast at arbitrary scale.
>> > > >>>>>
>> > > >>>>> The collective execution model might not be everyone's bag, but
>> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
>> > MPI_Fetch_and_op) and dropping into collective mode has big advantages for
>> > certain algorithms in computational statistics/machine learning.
>> > > >>>>>
>> > > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
>> > are confidential and may also be privileged. If you are not the intended
>> > recipient, please notify the sender immediately and do not disclose the
>> > contents to any other person, use it for any purpose, or store or copy the
>> > information in any medium. Thank you.
>> > > >>>>
>> > > >>>
>> > > >>
>> > > >
>> > >
>> >
>> 
>> 
>> -- 
>> Supun Kamburugamuve
>>

Re: Arrow in HPC

Posted by David Li <li...@apache.org>.
Ah, yes, thanks for the reminder. That's one of the things that needs to be addressed for sure.

-David

On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
> One general observation. I think this implementation uses the polling to
> check the progress. Because of the client server semantics of Arrow Flight,
> you may need to use an interrupt based polling like epoll to avoid the busy
> looping.
> 
> Best,
> Supun..
> 
> On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:
> 
> > Thanks for those results, Yibo! Looks like there's still more room for
> > improvement here. Yes, things are a little unstable, though I didn't
> > get that much trouble trying to just start the benchmark - I will need
> > to find suitable hardware and iron out these issues. Note that I've
> > only implemented DoGet, and I haven't implemented concurrent streams,
> > which would explain why most benchmark configurations hang or error.
> >
> > Since the last time, I've rewritten the prototype to use UCX's "active
> > message" functionality instead of trying to implement messages over
> > the "streams" API. This simplified the code. I also did some
> > refactoring along the lines of Yibo's prototype to share more code
> > between the gRPC and UCX implementations. Here are some benchmark
> > numbers:
> >
> > For IPC (server/client on the same machine): UCX with shared memory
> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
> >
> > gRPC:
> > 128KiB batches: 4463 MiB/s
> > 2MiB batches:   3537 MiB/s
> > 32MiB batches:  1828 MiB/s
> >
> > UCX (shared memory):
> > 128KiB batches: 6500 MiB/s
> > 2MiB batches:  13879 MiB/s
> > 32MiB batches:  9045 MiB/s
> >
> > UCX (TCP):
> > 128KiB batches: 1069 MiB/s
> > 2MiB batches:   1735 MiB/s
> > 32MiB batches:  1602 MiB/s
> >
> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
> > bandwidth. This isn't really a scenario where UCX is expected to
> > shine, however, UCX performs comparably to gRPC here.
> >
> > gRPC:
> > 128 KiB batches: 554 MiB/s
> > 2 MiB batches:   575 MiB/s
> >
> > UCX:
> > 128 KiB batches: 546 MiB/s
> > 2 MiB batches:   567 MiB/s
> >
> > Raw test logs can be found here:
> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
> >
> > For IPC, the shared memory results are promising in that it could be
> > feasible to expose a library purely over Flight without worrying about
> > FFI bindings. Also, it seems results are roughly comparable to what
> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
> > performant shared memory transport and support for more exotic
> > hardware.
> >
> > There's still much work to be done; at this point, I'd like to start
> > implementing the rest of the Flight methods, fixing up the many TODOs
> > scattered around, trying to refactor more things to share code between
> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
> > path for.
> >
> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
> >
> > -David
> >
> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> > > Some updates.
> > >
> > > I tested David's UCX transport patch over 100Gb network. FlightRPC over
> > > UCX/RDMA improves throughput about 50%, with lower and flat latency.
> > > And I think there are chances to improve further. See test report [1].
> > >
> > > For the data plane approach, the PoC shared memory data plane also
> > > introduces significantly performance boost. Details at [2].
> > >
> > > Glad to see there are big potentials to improve FlightRPC performance.
> > >
> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
> > >
> > > On 12/30/21 11:57 PM, David Li wrote:
> > > > Ah, I see.
> > > >
> > > > I think both projects can proceed as well. At some point we will have
> > to figure out how to merge them, but I think it's too early to see how
> > exactly we will want to refactor things.
> > > >
> > > > I looked over the code and I don't have any important comments for
> > now. Looking forward to reviewing when it's ready.
> > > >
> > > > -David
> > > >
> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> > > >>
> > > >>
> > > >> On 12/29/21 11:03 PM, David Li wrote:
> > > >>> Awesome, thanks for sharing this too!
> > > >>>
> > > >>> The refactoring you have with DataClientStream what I would like to
> > do as well - I think much of the existing code can be adapted to be more
> > transport-agnostic and then it will be easier to support new transports
> > (whether data-only or for all methods).
> > > >>>
> > > >>> Where do you see the gaps between gRPC and this? I think what would
> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
> > client sees the unfamiliar prefix and creates a new client for the DoGet
> > call (it would have to do this anyways if, for instance, the GetFlightInfo
> > call returned the address of a different server).
> > > >>>
> > > >>
> > > >> I mean implementation details. Some unit test runs longer than
> > expected
> > > >> (data plane timeouts reading from an ended stream). Looks grpc stream
> > > >> finish message is not correctly intercepted and forwarded to data
> > plane.
> > > >> I don't think it's big problem, just need some time to debug.
> > > >>
> > > >>> I also wonder how this stacks up to UCX's shared memory backend (I
> > did not test this though).
> > > >>>
> > > >>
> > > >> I implemented a shared memory data plane only to verify and
> > consolidate
> > > >> the data plane design, as it's the easiest (and useful) driver. I also
> > > >> plan to implement a socket based data plane, not useful in practice,
> > > >> only to make sure the design works ok across network. Then we can add
> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
> > on
> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
> > equipment).
> > > >>
> > > >>> I would like to be able to support entire new transports for certain
> > cases (namely browser support - though perhaps one of the gRPC proxies
> > would suffice there), but even in that case, we could make it so that a new
> > transport only needs to implement the data plane methods. Only having to
> > support the data plane methods would save significant implementation effort
> > for all non-browser cases so I think it's a worthwhile approach.
> > > >>>
> > > >>
> > > >> Thanks for being interest in this approach. My current plan is to
> > first
> > > >> refactor shared memory data plane to verify it beats grpc in local rpc
> > > >> by considerable margin, otherwise there must be big mistakes in my
> > > >> design. After that I will fix unit test issues and deliver for
> > community
> > > >> review.
> > > >>
> > > >> Anyway, don't let me block your implementations. And if you think it's
> > > >> useful, I can push current code for more detailed discussion.
> > > >>
> > > >>> -David
> > > >>>
> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> > > >>>> Thanks David to initiate UCX integration, great work!
> > > >>>> I think 5Gbps network is too limited for performance evaluation. I
> > will try the patch on 100Gb RDMA network, hopefully we can see some
> > improvements.
> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
> > latency. I also benchmarked raw RDMA performance (same batch sizes as
> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
> > comparison is not fair. With David's patch, we can get a more realistic
> > comparison.
> > > >>>>
> > > >>>> I'm implementing a data plane approach to hope we can adopt new
> > data acceleration methods easily. My approach is to replace only the
> > FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
> > grpc is still used for all rpc calls.
> > > >>>> Code is at my github repo [2]. Besides the framework, I just
> > implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
> > tests passed, TestCancel hangs, some unit tests run longer than expected,
> > still debugging. The shared memory data plane performance is pretty bad
> > now, due to repeated map/unmap for each read/write, pre-allocated pages
> > should improve much, still experimenting.
> > > >>>>
> > > >>>> Would like to hear community comments.
> > > >>>>
> > > >>>> My personal opinion is the data plane approach reuses grpc control
> > plane, may be easier to add new data acceleration methods, but it needs to
> > fit into grpc seamlessly (there're still gaps not resolved). A new tranport
> > requires much more initial effort, but may payoff later. And looks these
> > two approaches don't conflict with each other.
> > > >>>>
> > > >>>> [1] Test environment
> > > >>>> nics: mellanox connectx5
> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> > > >>>> os: ubuntu 20.04, linux kernel 5.4
> > > >>>> test case: 128k batch size, DoGet
> > > >>>>
> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> > > >>>>
> > > >>>> ________________________________
> > > >>>> From: David Li <li...@apache.org>
> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> > > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
> > > >>>> Subject: Re: Arrow in HPC
> > > >>>>
> > > >>>> I ended up drafting an implementation of Flight based on UCX, and
> > doing some
> > > >>>> of the necessary refactoring to support additional backends in the
> > future.
> > > >>>> It can run the Flight benchmark, and performance is about
> > comparable to
> > > >>>> gRPC, as tested on AWS EC2.
> > > >>>>
> > > >>>> The implementation is based on the UCP streams API. It's extremely
> > > >>>> bare-bones and is really only a proof of concept; a good amount of
> > work is
> > > >>>> needed to turn it into a usable implementation. I had hoped it
> > would perform
> > > >>>> markedly better than gRPC, at least in this early test, but this
> > seems not
> > > >>>> to be the case. That said: I am likely not using UCX properly, UCX
> > would
> > > >>>> still open up support for additional hardware, and this work should
> > allow
> > > >>>> other backends to be implemented more easily.
> > > >>>>
> > > >>>> The branch can be viewed at
> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> > > >>>>
> > > >>>> I've attached the benchmark output at the end.
> > > >>>>
> > > >>>> There are still quite a few TODOs and things that need
> > investigating:
> > > >>>>
> > > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
> > that.
> > > >>>> - Concurrent requests are not supported, or even making more than
> > one
> > > >>>>     request on a connection, nor does the server support concurrent
> > clients.
> > > >>>>     We also need to decide whether to even support concurrent
> > requests, and
> > > >>>>     how (e.g. pooling multiple connections, or implementing a
> > gRPC/HTTP2 style
> > > >>>>     protocol, or even possibly implementing HTTP2).
> > > >>>> - We need to make sure we properly handle errors, etc. everywhere.
> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
> > > >>>>     implementation work well on RDMA and other specialized hardware?
> > > >>>> - Do we also need to support the UCX tag API?
> > > >>>> - Can we refactor out interfaces that allow sharing more of the
> > > >>>>     client/server implementation between different backends?
> > > >>>> - Are the abstractions sufficient to support other potential
> > backends like
> > > >>>>     MPI, libfabrics, or WebSockets?
> > > >>>>
> > > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
> > Otherwise,
> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
> > and figure
> > > >>>> out how this effort can proceed.
> > > >>>>
> > > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
> > That would
> > > >>>> be preferable, frankly, given otherwise we'd might have to
> > re-implement a
> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
> > necessary
> > > >>>> proposal stalled and was dropped without much discussion:
> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> > > >>>>
> > > >>>> Benchmark results (also uploaded at
> > > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> > > >>>>
> > > >>>> Testing was done between two t3.xlarge instances in the same zone.
> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> > > >>>>
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> > -records_per_batch=4096
> > > >>>> Testing method: DoGet
> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 131072
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 1310720000
> > > >>>> Nanos: 2165862969
> > > >>>> Speed: 577.137 MB/s
> > > >>>> Throughput: 4617.1 batches/s
> > > >>>> Latency mean: 214 us
> > > >>>> Latency quantile=0.5: 209 us
> > > >>>> Latency quantile=0.95: 340 us
> > > >>>> Latency quantile=0.99: 409 us
> > > >>>> Latency max: 6350 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> > -records_per_batch=65536
> > > >>>> Testing method: DoGet
> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 2097152
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 20971520000
> > > >>>> Nanos: 34184175236
> > > >>>> Speed: 585.066 MB/s
> > > >>>> Throughput: 292.533 batches/s
> > > >>>> Latency mean: 3415 us
> > > >>>> Latency quantile=0.5: 3408 us
> > > >>>> Latency quantile=0.95: 3549 us
> > > >>>> Latency quantile=0.99: 3800 us
> > > >>>> Latency max: 20236 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> > -records_per_batch=4096
> > > >>>> Testing method: DoGet
> > > >>>> Using standalone TCP server
> > > >>>> Server host: 172.31.34.4
> > > >>>> Server port: 31337
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 131072
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 1310720000
> > > >>>> Nanos: 2375289668
> > > >>>> Speed: 526.252 MB/s
> > > >>>> Throughput: 4210.01 batches/s
> > > >>>> Latency mean: 235 us
> > > >>>> Latency quantile=0.5: 203 us
> > > >>>> Latency quantile=0.95: 328 us
> > > >>>> Latency quantile=0.99: 1377 us
> > > >>>> Latency max: 17860 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> > -records_per_batch=65536
> > > >>>> Testing method: DoGet
> > > >>>> Using standalone TCP server
> > > >>>> Server host: 172.31.34.4
> > > >>>> Server port: 31337
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 2097152
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 20971520000
> > > >>>> Nanos: 34202704498
> > > >>>> Speed: 584.749 MB/s
> > > >>>> Throughput: 292.375 batches/s
> > > >>>> Latency mean: 3416 us
> > > >>>> Latency quantile=0.5: 3406 us
> > > >>>> Latency quantile=0.95: 3548 us
> > > >>>> Latency quantile=0.99: 3764 us
> > > >>>> Latency max: 17086 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
> > -p 1337 -Z -l 1M
> > > >>>> Connecting to host 172.31.34.4, port 1337
> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
> > 1337
> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
> > MBytes
> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
> > MBytes
> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
> >    sender
> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
> >   receiver
> > > >>>>
> > > >>>> iperf Done.
> > > >>>>
> > > >>>> Best,
> > > >>>> David
> > > >>>>
> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> > > >>>>> "David Li" <li...@apache.org> writes:
> > > >>>>>
> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
> > results. Even if it is a very hacky PoC it will be interesting to see how
> > it affects performance, though as Keith points out there are benefits in
> > general to UCX (or similar library), and we can work out the implementation
> > plan from there.
> > > >>>>>>
> > > >>>>>> To Benson's point - the work done to get UCX supported would pave
> > the way to supporting other backends as well. I'm personally not familiar
> > with UCX, MPI, etc. so is MPI here more about playing well with established
> > practices or does it also offer potential hardware support/performance
> > improvements like UCX would?
> > > >>>>>
> > > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
> > both of which are permissively licensed open source community projects.
> > Both have direct support for UCX and unless your needs are very specific,
> > the overhead of going through MPI is likely to be negligible. Both also
> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
> > Spectrum MPI (Open MPI derivative), which may have optimizations for
> > proprietary networks. Both MPICH and Open MPI can be built without UCX, and
> > this is often easier (UCX 'master' is more volatile in my experience).
> > > >>>>>
> > > >>>>> The vast majority of distributed memory scientific applications
> > use MPI or higher level libraries, rather than writing directly to UCX
> > (which provides less coverage of HPC networks). I think MPI compatibility
> > is important.
> > > >>>>>
> > > >>>>>   From way up-thread (sorry):
> > > >>>>>
> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
> > another
> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
> > the HPC
> > > >>>>>>>>>>>> space.
> > > >>>>>
> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
> > reduction and give every process the result; these run in log(P) or better
> > time with small constants -- 15 microseconds is typical for a cheap
> > reduction operation on a million processes). MPI supports user-defined
> > operations for reductions and prefix-scan operations. If we defined MPI_Ops
> > for Arrow types, we could compute summary statistics and other algorithmic
> > building blocks fast at arbitrary scale.
> > > >>>>>
> > > >>>>> The collective execution model might not be everyone's bag, but
> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
> > MPI_Fetch_and_op) and dropping into collective mode has big advantages for
> > certain algorithms in computational statistics/machine learning.
> > > >>>>>
> > > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
> > are confidential and may also be privileged. If you are not the intended
> > recipient, please notify the sender immediately and do not disclose the
> > contents to any other person, use it for any purpose, or store or copy the
> > information in any medium. Thank you.
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> >
> 
> 
> -- 
> Supun Kamburugamuve
> 

Re: Arrow in HPC

Posted by Supun Kamburugamuve <su...@apache.org>.
One general observation. I think this implementation uses the polling to
check the progress. Because of the client server semantics of Arrow Flight,
you may need to use an interrupt based polling like epoll to avoid the busy
looping.

Best,
Supun..

On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:

> Thanks for those results, Yibo! Looks like there's still more room for
> improvement here. Yes, things are a little unstable, though I didn't
> get that much trouble trying to just start the benchmark - I will need
> to find suitable hardware and iron out these issues. Note that I've
> only implemented DoGet, and I haven't implemented concurrent streams,
> which would explain why most benchmark configurations hang or error.
>
> Since the last time, I've rewritten the prototype to use UCX's "active
> message" functionality instead of trying to implement messages over
> the "streams" API. This simplified the code. I also did some
> refactoring along the lines of Yibo's prototype to share more code
> between the gRPC and UCX implementations. Here are some benchmark
> numbers:
>
> For IPC (server/client on the same machine): UCX with shared memory
> handily beats gRPC here. UCX with TCP isn't quite up to par, though.
>
> gRPC:
> 128KiB batches: 4463 MiB/s
> 2MiB batches:   3537 MiB/s
> 32MiB batches:  1828 MiB/s
>
> UCX (shared memory):
> 128KiB batches: 6500 MiB/s
> 2MiB batches:  13879 MiB/s
> 32MiB batches:  9045 MiB/s
>
> UCX (TCP):
> 128KiB batches: 1069 MiB/s
> 2MiB batches:   1735 MiB/s
> 32MiB batches:  1602 MiB/s
>
> For RPC (server/client on different machines): Two t3.xlarge (4 core,
> 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
> bandwidth. This isn't really a scenario where UCX is expected to
> shine, however, UCX performs comparably to gRPC here.
>
> gRPC:
> 128 KiB batches: 554 MiB/s
> 2 MiB batches:   575 MiB/s
>
> UCX:
> 128 KiB batches: 546 MiB/s
> 2 MiB batches:   567 MiB/s
>
> Raw test logs can be found here:
> https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
>
> For IPC, the shared memory results are promising in that it could be
> feasible to expose a library purely over Flight without worrying about
> FFI bindings. Also, it seems results are roughly comparable to what
> Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
> performant shared memory transport and support for more exotic
> hardware.
>
> There's still much work to be done; at this point, I'd like to start
> implementing the rest of the Flight methods, fixing up the many TODOs
> scattered around, trying to refactor more things to share code between
> gRPC/UCX, and find and benchmark some hardware that UCX has a fast
> path for.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15282
>
> -David
>
> On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> > Some updates.
> >
> > I tested David's UCX transport patch over 100Gb network. FlightRPC over
> > UCX/RDMA improves throughput about 50%, with lower and flat latency.
> > And I think there are chances to improve further. See test report [1].
> >
> > For the data plane approach, the PoC shared memory data plane also
> > introduces significantly performance boost. Details at [2].
> >
> > Glad to see there are big potentials to improve FlightRPC performance.
> >
> > [1] https://issues.apache.org/jira/browse/ARROW-15229
> > [2] https://issues.apache.org/jira/browse/ARROW-15282
> >
> > On 12/30/21 11:57 PM, David Li wrote:
> > > Ah, I see.
> > >
> > > I think both projects can proceed as well. At some point we will have
> to figure out how to merge them, but I think it's too early to see how
> exactly we will want to refactor things.
> > >
> > > I looked over the code and I don't have any important comments for
> now. Looking forward to reviewing when it's ready.
> > >
> > > -David
> > >
> > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> > >>
> > >>
> > >> On 12/29/21 11:03 PM, David Li wrote:
> > >>> Awesome, thanks for sharing this too!
> > >>>
> > >>> The refactoring you have with DataClientStream what I would like to
> do as well - I think much of the existing code can be adapted to be more
> transport-agnostic and then it will be easier to support new transports
> (whether data-only or for all methods).
> > >>>
> > >>> Where do you see the gaps between gRPC and this? I think what would
> happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
> client sees the unfamiliar prefix and creates a new client for the DoGet
> call (it would have to do this anyways if, for instance, the GetFlightInfo
> call returned the address of a different server).
> > >>>
> > >>
> > >> I mean implementation details. Some unit test runs longer than
> expected
> > >> (data plane timeouts reading from an ended stream). Looks grpc stream
> > >> finish message is not correctly intercepted and forwarded to data
> plane.
> > >> I don't think it's big problem, just need some time to debug.
> > >>
> > >>> I also wonder how this stacks up to UCX's shared memory backend (I
> did not test this though).
> > >>>
> > >>
> > >> I implemented a shared memory data plane only to verify and
> consolidate
> > >> the data plane design, as it's the easiest (and useful) driver. I also
> > >> plan to implement a socket based data plane, not useful in practice,
> > >> only to make sure the design works ok across network. Then we can add
> > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
> on
> > >> commodity hardware, unlike UCX/RDMA which requires expensive
> equipment).
> > >>
> > >>> I would like to be able to support entire new transports for certain
> cases (namely browser support - though perhaps one of the gRPC proxies
> would suffice there), but even in that case, we could make it so that a new
> transport only needs to implement the data plane methods. Only having to
> support the data plane methods would save significant implementation effort
> for all non-browser cases so I think it's a worthwhile approach.
> > >>>
> > >>
> > >> Thanks for being interest in this approach. My current plan is to
> first
> > >> refactor shared memory data plane to verify it beats grpc in local rpc
> > >> by considerable margin, otherwise there must be big mistakes in my
> > >> design. After that I will fix unit test issues and deliver for
> community
> > >> review.
> > >>
> > >> Anyway, don't let me block your implementations. And if you think it's
> > >> useful, I can push current code for more detailed discussion.
> > >>
> > >>> -David
> > >>>
> > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> > >>>> Thanks David to initiate UCX integration, great work!
> > >>>> I think 5Gbps network is too limited for performance evaluation. I
> will try the patch on 100Gb RDMA network, hopefully we can see some
> improvements.
> > >>>> I once benchmarked flight over 100Gb network [1], grpc based
> throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
> latency. I also benchmarked raw RDMA performance (same batch sizes as
> flight), one thread can achive 9GB/s with 12us latency. Of couse the
> comparison is not fair. With David's patch, we can get a more realistic
> comparison.
> > >>>>
> > >>>> I'm implementing a data plane approach to hope we can adopt new
> data acceleration methods easily. My approach is to replace only the
> FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
> grpc is still used for all rpc calls.
> > >>>> Code is at my github repo [2]. Besides the framework, I just
> implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
> tests passed, TestCancel hangs, some unit tests run longer than expected,
> still debugging. The shared memory data plane performance is pretty bad
> now, due to repeated map/unmap for each read/write, pre-allocated pages
> should improve much, still experimenting.
> > >>>>
> > >>>> Would like to hear community comments.
> > >>>>
> > >>>> My personal opinion is the data plane approach reuses grpc control
> plane, may be easier to add new data acceleration methods, but it needs to
> fit into grpc seamlessly (there're still gaps not resolved). A new tranport
> requires much more initial effort, but may payoff later. And looks these
> two approaches don't conflict with each other.
> > >>>>
> > >>>> [1] Test environment
> > >>>> nics: mellanox connectx5
> > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> > >>>> os: ubuntu 20.04, linux kernel 5.4
> > >>>> test case: 128k batch size, DoGet
> > >>>>
> > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> > >>>>
> > >>>> ________________________________
> > >>>> From: David Li <li...@apache.org>
> > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
> > >>>> Subject: Re: Arrow in HPC
> > >>>>
> > >>>> I ended up drafting an implementation of Flight based on UCX, and
> doing some
> > >>>> of the necessary refactoring to support additional backends in the
> future.
> > >>>> It can run the Flight benchmark, and performance is about
> comparable to
> > >>>> gRPC, as tested on AWS EC2.
> > >>>>
> > >>>> The implementation is based on the UCP streams API. It's extremely
> > >>>> bare-bones and is really only a proof of concept; a good amount of
> work is
> > >>>> needed to turn it into a usable implementation. I had hoped it
> would perform
> > >>>> markedly better than gRPC, at least in this early test, but this
> seems not
> > >>>> to be the case. That said: I am likely not using UCX properly, UCX
> would
> > >>>> still open up support for additional hardware, and this work should
> allow
> > >>>> other backends to be implemented more easily.
> > >>>>
> > >>>> The branch can be viewed at
> > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> > >>>>
> > >>>> I've attached the benchmark output at the end.
> > >>>>
> > >>>> There are still quite a few TODOs and things that need
> investigating:
> > >>>>
> > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
> that.
> > >>>> - Concurrent requests are not supported, or even making more than
> one
> > >>>>     request on a connection, nor does the server support concurrent
> clients.
> > >>>>     We also need to decide whether to even support concurrent
> requests, and
> > >>>>     how (e.g. pooling multiple connections, or implementing a
> gRPC/HTTP2 style
> > >>>>     protocol, or even possibly implementing HTTP2).
> > >>>> - We need to make sure we properly handle errors, etc. everywhere.
> > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
> > >>>>     implementation work well on RDMA and other specialized hardware?
> > >>>> - Do we also need to support the UCX tag API?
> > >>>> - Can we refactor out interfaces that allow sharing more of the
> > >>>>     client/server implementation between different backends?
> > >>>> - Are the abstractions sufficient to support other potential
> backends like
> > >>>>     MPI, libfabrics, or WebSockets?
> > >>>>
> > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
> Otherwise,
> > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
> and figure
> > >>>> out how this effort can proceed.
> > >>>>
> > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
> That would
> > >>>> be preferable, frankly, given otherwise we'd might have to
> re-implement a
> > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
> necessary
> > >>>> proposal stalled and was dropped without much discussion:
> > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> > >>>>
> > >>>> Benchmark results (also uploaded at
> > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> > >>>>
> > >>>> Testing was done between two t3.xlarge instances in the same zone.
> > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> > >>>>
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> -records_per_batch=4096
> > >>>> Testing method: DoGet
> > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 131072
> > >>>> Batches read: 10000
> > >>>> Bytes read: 1310720000
> > >>>> Nanos: 2165862969
> > >>>> Speed: 577.137 MB/s
> > >>>> Throughput: 4617.1 batches/s
> > >>>> Latency mean: 214 us
> > >>>> Latency quantile=0.5: 209 us
> > >>>> Latency quantile=0.95: 340 us
> > >>>> Latency quantile=0.99: 409 us
> > >>>> Latency max: 6350 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> -records_per_batch=65536
> > >>>> Testing method: DoGet
> > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 2097152
> > >>>> Batches read: 10000
> > >>>> Bytes read: 20971520000
> > >>>> Nanos: 34184175236
> > >>>> Speed: 585.066 MB/s
> > >>>> Throughput: 292.533 batches/s
> > >>>> Latency mean: 3415 us
> > >>>> Latency quantile=0.5: 3408 us
> > >>>> Latency quantile=0.95: 3549 us
> > >>>> Latency quantile=0.99: 3800 us
> > >>>> Latency max: 20236 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> -records_per_batch=4096
> > >>>> Testing method: DoGet
> > >>>> Using standalone TCP server
> > >>>> Server host: 172.31.34.4
> > >>>> Server port: 31337
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 131072
> > >>>> Batches read: 10000
> > >>>> Bytes read: 1310720000
> > >>>> Nanos: 2375289668
> > >>>> Speed: 526.252 MB/s
> > >>>> Throughput: 4210.01 batches/s
> > >>>> Latency mean: 235 us
> > >>>> Latency quantile=0.5: 203 us
> > >>>> Latency quantile=0.95: 328 us
> > >>>> Latency quantile=0.99: 1377 us
> > >>>> Latency max: 17860 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> -records_per_batch=65536
> > >>>> Testing method: DoGet
> > >>>> Using standalone TCP server
> > >>>> Server host: 172.31.34.4
> > >>>> Server port: 31337
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 2097152
> > >>>> Batches read: 10000
> > >>>> Bytes read: 20971520000
> > >>>> Nanos: 34202704498
> > >>>> Speed: 584.749 MB/s
> > >>>> Throughput: 292.375 batches/s
> > >>>> Latency mean: 3416 us
> > >>>> Latency quantile=0.5: 3406 us
> > >>>> Latency quantile=0.95: 3548 us
> > >>>> Latency quantile=0.99: 3764 us
> > >>>> Latency max: 17086 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
> -p 1337 -Z -l 1M
> > >>>> Connecting to host 172.31.34.4, port 1337
> > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
> 1337
> > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
> MBytes
> > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
> MBytes
> > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
>    sender
> > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
>   receiver
> > >>>>
> > >>>> iperf Done.
> > >>>>
> > >>>> Best,
> > >>>> David
> > >>>>
> > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> > >>>>> "David Li" <li...@apache.org> writes:
> > >>>>>
> > >>>>>> Thanks for the clarification Yibo, looking forward to the
> results. Even if it is a very hacky PoC it will be interesting to see how
> it affects performance, though as Keith points out there are benefits in
> general to UCX (or similar library), and we can work out the implementation
> plan from there.
> > >>>>>>
> > >>>>>> To Benson's point - the work done to get UCX supported would pave
> the way to supporting other backends as well. I'm personally not familiar
> with UCX, MPI, etc. so is MPI here more about playing well with established
> practices or does it also offer potential hardware support/performance
> improvements like UCX would?
> > >>>>>
> > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
> both of which are permissively licensed open source community projects.
> Both have direct support for UCX and unless your needs are very specific,
> the overhead of going through MPI is likely to be negligible. Both also
> have proprietary derivatives, such as Cray MPI (MPICH derivative) and
> Spectrum MPI (Open MPI derivative), which may have optimizations for
> proprietary networks. Both MPICH and Open MPI can be built without UCX, and
> this is often easier (UCX 'master' is more volatile in my experience).
> > >>>>>
> > >>>>> The vast majority of distributed memory scientific applications
> use MPI or higher level libraries, rather than writing directly to UCX
> (which provides less coverage of HPC networks). I think MPI compatibility
> is important.
> > >>>>>
> > >>>>>   From way up-thread (sorry):
> > >>>>>
> > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
> another
> > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
> the HPC
> > >>>>>>>>>>>> space.
> > >>>>>
> > >>>>> MPI has collective operations like MPI_Allreduce (perform a
> reduction and give every process the result; these run in log(P) or better
> time with small constants -- 15 microseconds is typical for a cheap
> reduction operation on a million processes). MPI supports user-defined
> operations for reductions and prefix-scan operations. If we defined MPI_Ops
> for Arrow types, we could compute summary statistics and other algorithmic
> building blocks fast at arbitrary scale.
> > >>>>>
> > >>>>> The collective execution model might not be everyone's bag, but
> MPI_Op can also be used in one-sided operations (MPI_Accumulate and
> MPI_Fetch_and_op) and dropping into collective mode has big advantages for
> certain algorithms in computational statistics/machine learning.
> > >>>>>
> > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
> are confidential and may also be privileged. If you are not the intended
> recipient, please notify the sender immediately and do not disclose the
> contents to any other person, use it for any purpose, or store or copy the
> information in any medium. Thank you.
> > >>>>
> > >>>
> > >>
> > >
> >
>


-- 
Supun Kamburugamuve

Re: Arrow in HPC

Posted by David Li <li...@apache.org>.
Thanks for those results, Yibo! Looks like there's still more room for
improvement here. Yes, things are a little unstable, though I didn't
get that much trouble trying to just start the benchmark - I will need
to find suitable hardware and iron out these issues. Note that I've
only implemented DoGet, and I haven't implemented concurrent streams,
which would explain why most benchmark configurations hang or error.

Since the last time, I've rewritten the prototype to use UCX's "active
message" functionality instead of trying to implement messages over
the "streams" API. This simplified the code. I also did some
refactoring along the lines of Yibo's prototype to share more code
between the gRPC and UCX implementations. Here are some benchmark
numbers:

For IPC (server/client on the same machine): UCX with shared memory
handily beats gRPC here. UCX with TCP isn't quite up to par, though.

gRPC:
128KiB batches: 4463 MiB/s
2MiB batches:   3537 MiB/s
32MiB batches:  1828 MiB/s

UCX (shared memory):
128KiB batches: 6500 MiB/s
2MiB batches:  13879 MiB/s
32MiB batches:  9045 MiB/s

UCX (TCP):
128KiB batches: 1069 MiB/s
2MiB batches:   1735 MiB/s
32MiB batches:  1602 MiB/s

For RPC (server/client on different machines): Two t3.xlarge (4 core,
16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
bandwidth. This isn't really a scenario where UCX is expected to
shine, however, UCX performs comparably to gRPC here.

gRPC:
128 KiB batches: 554 MiB/s
2 MiB batches:   575 MiB/s

UCX:
128 KiB batches: 546 MiB/s
2 MiB batches:   567 MiB/s

Raw test logs can be found here:
https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc

For IPC, the shared memory results are promising in that it could be
feasible to expose a library purely over Flight without worrying about
FFI bindings. Also, it seems results are roughly comparable to what
Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
performant shared memory transport and support for more exotic
hardware.

There's still much work to be done; at this point, I'd like to start
implementing the rest of the Flight methods, fixing up the many TODOs
scattered around, trying to refactor more things to share code between
gRPC/UCX, and find and benchmark some hardware that UCX has a fast
path for.

[1]: https://issues.apache.org/jira/browse/ARROW-15282

-David

On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> Some updates.
> 
> I tested David's UCX transport patch over 100Gb network. FlightRPC over 
> UCX/RDMA improves throughput about 50%, with lower and flat latency.
> And I think there are chances to improve further. See test report [1].
> 
> For the data plane approach, the PoC shared memory data plane also 
> introduces significantly performance boost. Details at [2].
> 
> Glad to see there are big potentials to improve FlightRPC performance.
> 
> [1] https://issues.apache.org/jira/browse/ARROW-15229
> [2] https://issues.apache.org/jira/browse/ARROW-15282
> 
> On 12/30/21 11:57 PM, David Li wrote:
> > Ah, I see.
> > 
> > I think both projects can proceed as well. At some point we will have to figure out how to merge them, but I think it's too early to see how exactly we will want to refactor things.
> > 
> > I looked over the code and I don't have any important comments for now. Looking forward to reviewing when it's ready.
> > 
> > -David
> > 
> > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> >>
> >>
> >> On 12/29/21 11:03 PM, David Li wrote:
> >>> Awesome, thanks for sharing this too!
> >>>
> >>> The refactoring you have with DataClientStream what I would like to do as well - I think much of the existing code can be adapted to be more transport-agnostic and then it will be easier to support new transports (whether data-only or for all methods).
> >>>
> >>> Where do you see the gaps between gRPC and this? I think what would happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3) client sees the unfamiliar prefix and creates a new client for the DoGet call (it would have to do this anyways if, for instance, the GetFlightInfo call returned the address of a different server).
> >>>
> >>
> >> I mean implementation details. Some unit test runs longer than expected
> >> (data plane timeouts reading from an ended stream). Looks grpc stream
> >> finish message is not correctly intercepted and forwarded to data plane.
> >> I don't think it's big problem, just need some time to debug.
> >>
> >>> I also wonder how this stacks up to UCX's shared memory backend (I did not test this though).
> >>>
> >>
> >> I implemented a shared memory data plane only to verify and consolidate
> >> the data plane design, as it's the easiest (and useful) driver. I also
> >> plan to implement a socket based data plane, not useful in practice,
> >> only to make sure the design works ok across network. Then we can add
> >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works on
> >> commodity hardware, unlike UCX/RDMA which requires expensive equipment).
> >>
> >>> I would like to be able to support entire new transports for certain cases (namely browser support - though perhaps one of the gRPC proxies would suffice there), but even in that case, we could make it so that a new transport only needs to implement the data plane methods. Only having to support the data plane methods would save significant implementation effort for all non-browser cases so I think it's a worthwhile approach.
> >>>
> >>
> >> Thanks for being interest in this approach. My current plan is to first
> >> refactor shared memory data plane to verify it beats grpc in local rpc
> >> by considerable margin, otherwise there must be big mistakes in my
> >> design. After that I will fix unit test issues and deliver for community
> >> review.
> >>
> >> Anyway, don't let me block your implementations. And if you think it's
> >> useful, I can push current code for more detailed discussion.
> >>
> >>> -David
> >>>
> >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> >>>> Thanks David to initiate UCX integration, great work!
> >>>> I think 5Gbps network is too limited for performance evaluation. I will try the patch on 100Gb RDMA network, hopefully we can see some improvements.
> >>>> I once benchmarked flight over 100Gb network [1], grpc based throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I also benchmarked raw RDMA performance (same batch sizes as flight), one thread can achive 9GB/s with 12us latency. Of couse the comparison is not fair. With David's patch, we can get a more realistic comparison.
> >>>>
> >>>> I'm implementing a data plane approach to hope we can adopt new data acceleration methods easily. My approach is to replace only the FlighData transmission of DoGet/Put/Exchange with data plane drivers, and grpc is still used for all rpc calls.
> >>>> Code is at my github repo [2]. Besides the framework, I just implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit tests passed, TestCancel hangs, some unit tests run longer than expected, still debugging. The shared memory data plane performance is pretty bad now, due to repeated map/unmap for each read/write, pre-allocated pages should improve much, still experimenting.
> >>>>
> >>>> Would like to hear community comments.
> >>>>
> >>>> My personal opinion is the data plane approach reuses grpc control plane, may be easier to add new data acceleration methods, but it needs to fit into grpc seamlessly (there're still gaps not resolved). A new tranport requires much more initial effort, but may payoff later. And looks these two approaches don't conflict with each other.
> >>>>
> >>>> [1] Test environment
> >>>> nics: mellanox connectx5
> >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> >>>> os: ubuntu 20.04, linux kernel 5.4
> >>>> test case: 128k batch size, DoGet
> >>>>
> >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> >>>>
> >>>> ________________________________
> >>>> From: David Li <li...@apache.org>
> >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
> >>>> Subject: Re: Arrow in HPC
> >>>>
> >>>> I ended up drafting an implementation of Flight based on UCX, and doing some
> >>>> of the necessary refactoring to support additional backends in the future.
> >>>> It can run the Flight benchmark, and performance is about comparable to
> >>>> gRPC, as tested on AWS EC2.
> >>>>
> >>>> The implementation is based on the UCP streams API. It's extremely
> >>>> bare-bones and is really only a proof of concept; a good amount of work is
> >>>> needed to turn it into a usable implementation. I had hoped it would perform
> >>>> markedly better than gRPC, at least in this early test, but this seems not
> >>>> to be the case. That said: I am likely not using UCX properly, UCX would
> >>>> still open up support for additional hardware, and this work should allow
> >>>> other backends to be implemented more easily.
> >>>>
> >>>> The branch can be viewed at
> >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> >>>>
> >>>> I've attached the benchmark output at the end.
> >>>>
> >>>> There are still quite a few TODOs and things that need investigating:
> >>>>
> >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at that.
> >>>> - Concurrent requests are not supported, or even making more than one
> >>>>     request on a connection, nor does the server support concurrent clients.
> >>>>     We also need to decide whether to even support concurrent requests, and
> >>>>     how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style
> >>>>     protocol, or even possibly implementing HTTP2).
> >>>> - We need to make sure we properly handle errors, etc. everywhere.
> >>>> - Are we using UCX in a performant and idiomatic manner? Will the
> >>>>     implementation work well on RDMA and other specialized hardware?
> >>>> - Do we also need to support the UCX tag API?
> >>>> - Can we refactor out interfaces that allow sharing more of the
> >>>>     client/server implementation between different backends?
> >>>> - Are the abstractions sufficient to support other potential backends like
> >>>>     MPI, libfabrics, or WebSockets?
> >>>>
> >>>> If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
> >>>> I'm hoping to plan out and try to tackle some of the TODOs above, and figure
> >>>> out how this effort can proceed.
> >>>>
> >>>> Antoine/Micah raised the possibility of extending gRPC instead. That would
> >>>> be preferable, frankly, given otherwise we'd might have to re-implement a
> >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
> >>>> proposal stalled and was dropped without much discussion:
> >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> >>>>
> >>>> Benchmark results (also uploaded at
> >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> >>>>
> >>>> Testing was done between two t3.xlarge instances in the same zone.
> >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> >>>>
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096
> >>>> Testing method: DoGet
> >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 131072
> >>>> Batches read: 10000
> >>>> Bytes read: 1310720000
> >>>> Nanos: 2165862969
> >>>> Speed: 577.137 MB/s
> >>>> Throughput: 4617.1 batches/s
> >>>> Latency mean: 214 us
> >>>> Latency quantile=0.5: 209 us
> >>>> Latency quantile=0.95: 340 us
> >>>> Latency quantile=0.99: 409 us
> >>>> Latency max: 6350 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536
> >>>> Testing method: DoGet
> >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 2097152
> >>>> Batches read: 10000
> >>>> Bytes read: 20971520000
> >>>> Nanos: 34184175236
> >>>> Speed: 585.066 MB/s
> >>>> Throughput: 292.533 batches/s
> >>>> Latency mean: 3415 us
> >>>> Latency quantile=0.5: 3408 us
> >>>> Latency quantile=0.95: 3549 us
> >>>> Latency quantile=0.99: 3800 us
> >>>> Latency max: 20236 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096
> >>>> Testing method: DoGet
> >>>> Using standalone TCP server
> >>>> Server host: 172.31.34.4
> >>>> Server port: 31337
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 131072
> >>>> Batches read: 10000
> >>>> Bytes read: 1310720000
> >>>> Nanos: 2375289668
> >>>> Speed: 526.252 MB/s
> >>>> Throughput: 4210.01 batches/s
> >>>> Latency mean: 235 us
> >>>> Latency quantile=0.5: 203 us
> >>>> Latency quantile=0.95: 328 us
> >>>> Latency quantile=0.99: 1377 us
> >>>> Latency max: 17860 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536
> >>>> Testing method: DoGet
> >>>> Using standalone TCP server
> >>>> Server host: 172.31.34.4
> >>>> Server port: 31337
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 2097152
> >>>> Batches read: 10000
> >>>> Bytes read: 20971520000
> >>>> Nanos: 34202704498
> >>>> Speed: 584.749 MB/s
> >>>> Throughput: 292.375 batches/s
> >>>> Latency mean: 3416 us
> >>>> Latency quantile=0.5: 3406 us
> >>>> Latency quantile=0.95: 3548 us
> >>>> Latency quantile=0.99: 3764 us
> >>>> Latency max: 17086 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l 1M
> >>>> Connecting to host 172.31.34.4, port 1337
> >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
> >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
> >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
> >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> >>>> [ ID] Interval           Transfer     Bitrate         Retr
> >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36             sender
> >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec                  receiver
> >>>>
> >>>> iperf Done.
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> >>>>> "David Li" <li...@apache.org> writes:
> >>>>>
> >>>>>> Thanks for the clarification Yibo, looking forward to the results. Even if it is a very hacky PoC it will be interesting to see how it affects performance, though as Keith points out there are benefits in general to UCX (or similar library), and we can work out the implementation plan from there.
> >>>>>>
> >>>>>> To Benson's point - the work done to get UCX supported would pave the way to supporting other backends as well. I'm personally not familiar with UCX, MPI, etc. so is MPI here more about playing well with established practices or does it also offer potential hardware support/performance improvements like UCX would?
> >>>>>
> >>>>> There are two main implementations of MPI, MPICH and Open MPI, both of which are permissively licensed open source community projects. Both have direct support for UCX and unless your needs are very specific, the overhead of going through MPI is likely to be negligible. Both also have proprietary derivatives, such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI derivative), which may have optimizations for proprietary networks. Both MPICH and Open MPI can be built without UCX, and this is often easier (UCX 'master' is more volatile in my experience).
> >>>>>
> >>>>> The vast majority of distributed memory scientific applications use MPI or higher level libraries, rather than writing directly to UCX (which provides less coverage of HPC networks). I think MPI compatibility is important.
> >>>>>
> >>>>>   From way up-thread (sorry):
> >>>>>
> >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As another
> >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC
> >>>>>>>>>>>> space.
> >>>>>
> >>>>> MPI has collective operations like MPI_Allreduce (perform a reduction and give every process the result; these run in log(P) or better time with small constants -- 15 microseconds is typical for a cheap reduction operation on a million processes). MPI supports user-defined operations for reductions and prefix-scan operations. If we defined MPI_Ops for Arrow types, we could compute summary statistics and other algorithmic building blocks fast at arbitrary scale.
> >>>>>
> >>>>> The collective execution model might not be everyone's bag, but MPI_Op can also be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) and dropping into collective mode has big advantages for certain algorithms in computational statistics/machine learning.
> >>>>>
> >>>> IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
> >>>>
> >>>
> >>
> > 
>