You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by David Li <li...@apache.org> on 2022/04/07 17:34:08 UTC

Re: Arrow in HPC

Just as an update: thanks to Yibo for the reviews; we've merged an initial implementation that will be available in Arrow 8.0.0 (if built from source). There's definitely more work to do:

ARROW-10787 [C++][Flight] DoExchange doesn't support dictionary replacement
ARROW-15756 [C++][FlightRPC] Benchmark in-process Flight performance
ARROW-15835 [C++][FlightRPC] Refactor auth, middleware into the transport-agnostic layer
ARROW-15836 [C++][FlightRPC] Refactor remaining methods into transport-agnostic handlers
ARROW-16069 [C++][FlightRPC] Refactor error statuses/codes into the transport-agnostic layer
ARROW-16124 [C++][FlightRPC] UCX server should be able to shed load
ARROW-16125 [C++][FlightRPC] Implement shutdown with deadline for UCX
ARROW-16126 [C++][FlightRPC] Pipeline memory allocation/registration
ARROW-16127 [C++][FlightRPC] Improve concurrent call implementation in UCX client
ARROW-16135 [C++][FlightRPC] Investigate TSAN with gRPC/UCX tests

However it should be usable, and any feedback from intrepid users would be very welcome.

On Fri, Mar 18, 2022, at 14:45, David Li wrote:
> For anyone interested, the PR is finally up and ready: 
> https://github.com/apache/arrow/pull/12442
>
> As part of this, Flight in C++ was refactored to allow plugging in 
> alternative transports. There's more work to be done there (auth, 
> middleware, etc. need to be uplifted into the common layer), but this 
> should enable UCX and potentially other network transports.
>
> There's still some caveats as described in the PR itself, including 
> some edge cases I need to track down and missing support for a variety 
> of features, but the core data plane methods are supported and the 
> Flight benchmark can be run.
>
> Thanks to Yibo Cai, Pavel Shamis, Antoine Pitrou (among others) for 
> assistance and review, and the HPC Advisory Council for granting access 
> to an HPC cluster to help with development and testing.
>
> On Tue, Jan 18, 2022, at 18:33, David Li wrote:
>> Ah, yes, thanks for the reminder. That's one of the things that needs 
>> to be addressed for sure.
>>
>> -David
>>
>> On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
>>> One general observation. I think this implementation uses the polling to
>>> check the progress. Because of the client server semantics of Arrow Flight,
>>> you may need to use an interrupt based polling like epoll to avoid the busy
>>> looping.
>>> 
>>> Best,
>>> Supun..
>>> 
>>> On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:
>>> 
>>> > Thanks for those results, Yibo! Looks like there's still more room for
>>> > improvement here. Yes, things are a little unstable, though I didn't
>>> > get that much trouble trying to just start the benchmark - I will need
>>> > to find suitable hardware and iron out these issues. Note that I've
>>> > only implemented DoGet, and I haven't implemented concurrent streams,
>>> > which would explain why most benchmark configurations hang or error.
>>> >
>>> > Since the last time, I've rewritten the prototype to use UCX's "active
>>> > message" functionality instead of trying to implement messages over
>>> > the "streams" API. This simplified the code. I also did some
>>> > refactoring along the lines of Yibo's prototype to share more code
>>> > between the gRPC and UCX implementations. Here are some benchmark
>>> > numbers:
>>> >
>>> > For IPC (server/client on the same machine): UCX with shared memory
>>> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
>>> >
>>> > gRPC:
>>> > 128KiB batches: 4463 MiB/s
>>> > 2MiB batches:   3537 MiB/s
>>> > 32MiB batches:  1828 MiB/s
>>> >
>>> > UCX (shared memory):
>>> > 128KiB batches: 6500 MiB/s
>>> > 2MiB batches:  13879 MiB/s
>>> > 32MiB batches:  9045 MiB/s
>>> >
>>> > UCX (TCP):
>>> > 128KiB batches: 1069 MiB/s
>>> > 2MiB batches:   1735 MiB/s
>>> > 32MiB batches:  1602 MiB/s
>>> >
>>> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
>>> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
>>> > bandwidth. This isn't really a scenario where UCX is expected to
>>> > shine, however, UCX performs comparably to gRPC here.
>>> >
>>> > gRPC:
>>> > 128 KiB batches: 554 MiB/s
>>> > 2 MiB batches:   575 MiB/s
>>> >
>>> > UCX:
>>> > 128 KiB batches: 546 MiB/s
>>> > 2 MiB batches:   567 MiB/s
>>> >
>>> > Raw test logs can be found here:
>>> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
>>> >
>>> > For IPC, the shared memory results are promising in that it could be
>>> > feasible to expose a library purely over Flight without worrying about
>>> > FFI bindings. Also, it seems results are roughly comparable to what
>>> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
>>> > performant shared memory transport and support for more exotic
>>> > hardware.
>>> >
>>> > There's still much work to be done; at this point, I'd like to start
>>> > implementing the rest of the Flight methods, fixing up the many TODOs
>>> > scattered around, trying to refactor more things to share code between
>>> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
>>> > path for.
>>> >
>>> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
>>> >
>>> > -David
>>> >
>>> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
>>> > > Some updates.
>>> > >
>>> > > I tested David's UCX transport patch over 100Gb network. FlightRPC over
>>> > > UCX/RDMA improves throughput about 50%, with lower and flat latency.
>>> > > And I think there are chances to improve further. See test report [1].
>>> > >
>>> > > For the data plane approach, the PoC shared memory data plane also
>>> > > introduces significantly performance boost. Details at [2].
>>> > >
>>> > > Glad to see there are big potentials to improve FlightRPC performance.
>>> > >
>>> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
>>> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
>>> > >
>>> > > On 12/30/21 11:57 PM, David Li wrote:
>>> > > > Ah, I see.
>>> > > >
>>> > > > I think both projects can proceed as well. At some point we will have
>>> > to figure out how to merge them, but I think it's too early to see how
>>> > exactly we will want to refactor things.
>>> > > >
>>> > > > I looked over the code and I don't have any important comments for
>>> > now. Looking forward to reviewing when it's ready.
>>> > > >
>>> > > > -David
>>> > > >
>>> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
>>> > > >>
>>> > > >>
>>> > > >> On 12/29/21 11:03 PM, David Li wrote:
>>> > > >>> Awesome, thanks for sharing this too!
>>> > > >>>
>>> > > >>> The refactoring you have with DataClientStream what I would like to
>>> > do as well - I think much of the existing code can be adapted to be more
>>> > transport-agnostic and then it will be easier to support new transports
>>> > (whether data-only or for all methods).
>>> > > >>>
>>> > > >>> Where do you see the gaps between gRPC and this? I think what would
>>> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
>>> > client sees the unfamiliar prefix and creates a new client for the DoGet
>>> > call (it would have to do this anyways if, for instance, the GetFlightInfo
>>> > call returned the address of a different server).
>>> > > >>>
>>> > > >>
>>> > > >> I mean implementation details. Some unit test runs longer than
>>> > expected
>>> > > >> (data plane timeouts reading from an ended stream). Looks grpc stream
>>> > > >> finish message is not correctly intercepted and forwarded to data
>>> > plane.
>>> > > >> I don't think it's big problem, just need some time to debug.
>>> > > >>
>>> > > >>> I also wonder how this stacks up to UCX's shared memory backend (I
>>> > did not test this though).
>>> > > >>>
>>> > > >>
>>> > > >> I implemented a shared memory data plane only to verify and
>>> > consolidate
>>> > > >> the data plane design, as it's the easiest (and useful) driver. I also
>>> > > >> plan to implement a socket based data plane, not useful in practice,
>>> > > >> only to make sure the design works ok across network. Then we can add
>>> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
>>> > on
>>> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
>>> > equipment).
>>> > > >>
>>> > > >>> I would like to be able to support entire new transports for certain
>>> > cases (namely browser support - though perhaps one of the gRPC proxies
>>> > would suffice there), but even in that case, we could make it so that a new
>>> > transport only needs to implement the data plane methods. Only having to
>>> > support the data plane methods would save significant implementation effort
>>> > for all non-browser cases so I think it's a worthwhile approach.
>>> > > >>>
>>> > > >>
>>> > > >> Thanks for being interest in this approach. My current plan is to
>>> > first
>>> > > >> refactor shared memory data plane to verify it beats grpc in local rpc
>>> > > >> by considerable margin, otherwise there must be big mistakes in my
>>> > > >> design. After that I will fix unit test issues and deliver for
>>> > community
>>> > > >> review.
>>> > > >>
>>> > > >> Anyway, don't let me block your implementations. And if you think it's
>>> > > >> useful, I can push current code for more detailed discussion.
>>> > > >>
>>> > > >>> -David
>>> > > >>>
>>> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
>>> > > >>>> Thanks David to initiate UCX integration, great work!
>>> > > >>>> I think 5Gbps network is too limited for performance evaluation. I
>>> > will try the patch on 100Gb RDMA network, hopefully we can see some
>>> > improvements.
>>> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
>>> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
>>> > latency. I also benchmarked raw RDMA performance (same batch sizes as
>>> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
>>> > comparison is not fair. With David's patch, we can get a more realistic
>>> > comparison.
>>> > > >>>>
>>> > > >>>> I'm implementing a data plane approach to hope we can adopt new
>>> > data acceleration methods easily. My approach is to replace only the
>>> > FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
>>> > grpc is still used for all rpc calls.
>>> > > >>>> Code is at my github repo [2]. Besides the framework, I just
>>> > implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
>>> > tests passed, TestCancel hangs, some unit tests run longer than expected,
>>> > still debugging. The shared memory data plane performance is pretty bad
>>> > now, due to repeated map/unmap for each read/write, pre-allocated pages
>>> > should improve much, still experimenting.
>>> > > >>>>
>>> > > >>>> Would like to hear community comments.
>>> > > >>>>
>>> > > >>>> My personal opinion is the data plane approach reuses grpc control
>>> > plane, may be easier to add new data acceleration methods, but it needs to
>>> > fit into grpc seamlessly (there're still gaps not resolved). A new tranport
>>> > requires much more initial effort, but may payoff later. And looks these
>>> > two approaches don't conflict with each other.
>>> > > >>>>
>>> > > >>>> [1] Test environment
>>> > > >>>> nics: mellanox connectx5
>>> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
>>> > > >>>> os: ubuntu 20.04, linux kernel 5.4
>>> > > >>>> test case: 128k batch size, DoGet
>>> > > >>>>
>>> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
>>> > > >>>>
>>> > > >>>> ________________________________
>>> > > >>>> From: David Li <li...@apache.org>
>>> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
>>> > > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
>>> > > >>>> Subject: Re: Arrow in HPC
>>> > > >>>>
>>> > > >>>> I ended up drafting an implementation of Flight based on UCX, and
>>> > doing some
>>> > > >>>> of the necessary refactoring to support additional backends in the
>>> > future.
>>> > > >>>> It can run the Flight benchmark, and performance is about
>>> > comparable to
>>> > > >>>> gRPC, as tested on AWS EC2.
>>> > > >>>>
>>> > > >>>> The implementation is based on the UCP streams API. It's extremely
>>> > > >>>> bare-bones and is really only a proof of concept; a good amount of
>>> > work is
>>> > > >>>> needed to turn it into a usable implementation. I had hoped it
>>> > would perform
>>> > > >>>> markedly better than gRPC, at least in this early test, but this
>>> > seems not
>>> > > >>>> to be the case. That said: I am likely not using UCX properly, UCX
>>> > would
>>> > > >>>> still open up support for additional hardware, and this work should
>>> > allow
>>> > > >>>> other backends to be implemented more easily.
>>> > > >>>>
>>> > > >>>> The branch can be viewed at
>>> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
>>> > > >>>>
>>> > > >>>> I've attached the benchmark output at the end.
>>> > > >>>>
>>> > > >>>> There are still quite a few TODOs and things that need
>>> > investigating:
>>> > > >>>>
>>> > > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
>>> > that.
>>> > > >>>> - Concurrent requests are not supported, or even making more than
>>> > one
>>> > > >>>>     request on a connection, nor does the server support concurrent
>>> > clients.
>>> > > >>>>     We also need to decide whether to even support concurrent
>>> > requests, and
>>> > > >>>>     how (e.g. pooling multiple connections, or implementing a
>>> > gRPC/HTTP2 style
>>> > > >>>>     protocol, or even possibly implementing HTTP2).
>>> > > >>>> - We need to make sure we properly handle errors, etc. everywhere.
>>> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
>>> > > >>>>     implementation work well on RDMA and other specialized hardware?
>>> > > >>>> - Do we also need to support the UCX tag API?
>>> > > >>>> - Can we refactor out interfaces that allow sharing more of the
>>> > > >>>>     client/server implementation between different backends?
>>> > > >>>> - Are the abstractions sufficient to support other potential
>>> > backends like
>>> > > >>>>     MPI, libfabrics, or WebSockets?
>>> > > >>>>
>>> > > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
>>> > Otherwise,
>>> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
>>> > and figure
>>> > > >>>> out how this effort can proceed.
>>> > > >>>>
>>> > > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
>>> > That would
>>> > > >>>> be preferable, frankly, given otherwise we'd might have to
>>> > re-implement a
>>> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
>>> > necessary
>>> > > >>>> proposal stalled and was dropped without much discussion:
>>> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
>>> > > >>>>
>>> > > >>>> Benchmark results (also uploaded at
>>> > > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
>>> > > >>>>
>>> > > >>>> Testing was done between two t3.xlarge instances in the same zone.
>>> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
>>> > > >>>>
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
>>> > -records_per_batch=4096
>>> > > >>>> Testing method: DoGet
>>> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 131072
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 1310720000
>>> > > >>>> Nanos: 2165862969
>>> > > >>>> Speed: 577.137 MB/s
>>> > > >>>> Throughput: 4617.1 batches/s
>>> > > >>>> Latency mean: 214 us
>>> > > >>>> Latency quantile=0.5: 209 us
>>> > > >>>> Latency quantile=0.95: 340 us
>>> > > >>>> Latency quantile=0.99: 409 us
>>> > > >>>> Latency max: 6350 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
>>> > -records_per_batch=65536
>>> > > >>>> Testing method: DoGet
>>> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
>>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 2097152
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 20971520000
>>> > > >>>> Nanos: 34184175236
>>> > > >>>> Speed: 585.066 MB/s
>>> > > >>>> Throughput: 292.533 batches/s
>>> > > >>>> Latency mean: 3415 us
>>> > > >>>> Latency quantile=0.5: 3408 us
>>> > > >>>> Latency quantile=0.95: 3549 us
>>> > > >>>> Latency quantile=0.99: 3800 us
>>> > > >>>> Latency max: 20236 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
>>> > -records_per_batch=4096
>>> > > >>>> Testing method: DoGet
>>> > > >>>> Using standalone TCP server
>>> > > >>>> Server host: 172.31.34.4
>>> > > >>>> Server port: 31337
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 131072
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 1310720000
>>> > > >>>> Nanos: 2375289668
>>> > > >>>> Speed: 526.252 MB/s
>>> > > >>>> Throughput: 4210.01 batches/s
>>> > > >>>> Latency mean: 235 us
>>> > > >>>> Latency quantile=0.5: 203 us
>>> > > >>>> Latency quantile=0.95: 328 us
>>> > > >>>> Latency quantile=0.99: 1377 us
>>> > > >>>> Latency max: 17860 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
>>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
>>> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
>>> > -records_per_batch=65536
>>> > > >>>> Testing method: DoGet
>>> > > >>>> Using standalone TCP server
>>> > > >>>> Server host: 172.31.34.4
>>> > > >>>> Server port: 31337
>>> > > >>>> Number of perf runs: 1
>>> > > >>>> Number of concurrent gets/puts: 1
>>> > > >>>> Batch size: 2097152
>>> > > >>>> Batches read: 10000
>>> > > >>>> Bytes read: 20971520000
>>> > > >>>> Nanos: 34202704498
>>> > > >>>> Speed: 584.749 MB/s
>>> > > >>>> Throughput: 292.375 batches/s
>>> > > >>>> Latency mean: 3416 us
>>> > > >>>> Latency quantile=0.5: 3406 us
>>> > > >>>> Latency quantile=0.95: 3548 us
>>> > > >>>> Latency quantile=0.99: 3764 us
>>> > > >>>> Latency max: 17086 us
>>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
>>> > -p 1337 -Z -l 1M
>>> > > >>>> Connecting to host 172.31.34.4, port 1337
>>> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
>>> > 1337
>>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
>>> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
>>> > MBytes
>>> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
>>> > MBytes
>>> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
>>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
>>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
>>> >    sender
>>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
>>> >   receiver
>>> > > >>>>
>>> > > >>>> iperf Done.
>>> > > >>>>
>>> > > >>>> Best,
>>> > > >>>> David
>>> > > >>>>
>>> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
>>> > > >>>>> "David Li" <li...@apache.org> writes:
>>> > > >>>>>
>>> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
>>> > results. Even if it is a very hacky PoC it will be interesting to see how
>>> > it affects performance, though as Keith points out there are benefits in
>>> > general to UCX (or similar library), and we can work out the implementation
>>> > plan from there.
>>> > > >>>>>>
>>> > > >>>>>> To Benson's point - the work done to get UCX supported would pave
>>> > the way to supporting other backends as well. I'm personally not familiar
>>> > with UCX, MPI, etc. so is MPI here more about playing well with established
>>> > practices or does it also offer potential hardware support/performance
>>> > improvements like UCX would?
>>> > > >>>>>
>>> > > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
>>> > both of which are permissively licensed open source community projects.
>>> > Both have direct support for UCX and unless your needs are very specific,
>>> > the overhead of going through MPI is likely to be negligible. Both also
>>> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
>>> > Spectrum MPI (Open MPI derivative), which may have optimizations for
>>> > proprietary networks. Both MPICH and Open MPI can be built without UCX, and
>>> > this is often easier (UCX 'master' is more volatile in my experience).
>>> > > >>>>>
>>> > > >>>>> The vast majority of distributed memory scientific applications
>>> > use MPI or higher level libraries, rather than writing directly to UCX
>>> > (which provides less coverage of HPC networks). I think MPI compatibility
>>> > is important.
>>> > > >>>>>
>>> > > >>>>>   From way up-thread (sorry):
>>> > > >>>>>
>>> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
>>> > another
>>> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
>>> > the HPC
>>> > > >>>>>>>>>>>> space.
>>> > > >>>>>
>>> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
>>> > reduction and give every process the result; these run in log(P) or better
>>> > time with small constants -- 15 microseconds is typical for a cheap
>>> > reduction operation on a million processes). MPI supports user-defined
>>> > operations for reductions and prefix-scan operations. If we defined MPI_Ops
>>> > for Arrow types, we could compute summary statistics and other algorithmic
>>> > building blocks fast at arbitrary scale.
>>> > > >>>>>
>>> > > >>>>> The collective execution model might not be everyone's bag, but
>>> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
>>> > MPI_Fetch_and_op) and dropping into collective mode has big advantages for
>>> > certain algorithms in computational statistics/machine learning.
>>> > > >>>>>
>>> > > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
>>> > are confidential and may also be privileged. If you are not the intended
>>> > recipient, please notify the sender immediately and do not disclose the
>>> > contents to any other person, use it for any purpose, or store or copy the
>>> > information in any medium. Thank you.
>>> > > >>>>
>>> > > >>>
>>> > > >>
>>> > > >
>>> > >
>>> >
>>> 
>>> 
>>> -- 
>>> Supun Kamburugamuve
>>>

Re: Arrow in HPC

Posted by Gavin Ray <ra...@gmail.com>.
Congrats!

On Thu, Apr 7, 2022 at 1:35 PM David Li <li...@apache.org> wrote:

> Just as an update: thanks to Yibo for the reviews; we've merged an initial
> implementation that will be available in Arrow 8.0.0 (if built from
> source). There's definitely more work to do:
>
> ARROW-10787 [C++][Flight] DoExchange doesn't support dictionary replacement
> ARROW-15756 [C++][FlightRPC] Benchmark in-process Flight performance
> ARROW-15835 [C++][FlightRPC] Refactor auth, middleware into the
> transport-agnostic layer
> ARROW-15836 [C++][FlightRPC] Refactor remaining methods into
> transport-agnostic handlers
> ARROW-16069 [C++][FlightRPC] Refactor error statuses/codes into the
> transport-agnostic layer
> ARROW-16124 [C++][FlightRPC] UCX server should be able to shed load
> ARROW-16125 [C++][FlightRPC] Implement shutdown with deadline for UCX
> ARROW-16126 [C++][FlightRPC] Pipeline memory allocation/registration
> ARROW-16127 [C++][FlightRPC] Improve concurrent call implementation in UCX
> client
> ARROW-16135 [C++][FlightRPC] Investigate TSAN with gRPC/UCX tests
>
> However it should be usable, and any feedback from intrepid users would be
> very welcome.
>
> On Fri, Mar 18, 2022, at 14:45, David Li wrote:
> > For anyone interested, the PR is finally up and ready:
> > https://github.com/apache/arrow/pull/12442
> >
> > As part of this, Flight in C++ was refactored to allow plugging in
> > alternative transports. There's more work to be done there (auth,
> > middleware, etc. need to be uplifted into the common layer), but this
> > should enable UCX and potentially other network transports.
> >
> > There's still some caveats as described in the PR itself, including
> > some edge cases I need to track down and missing support for a variety
> > of features, but the core data plane methods are supported and the
> > Flight benchmark can be run.
> >
> > Thanks to Yibo Cai, Pavel Shamis, Antoine Pitrou (among others) for
> > assistance and review, and the HPC Advisory Council for granting access
> > to an HPC cluster to help with development and testing.
> >
> > On Tue, Jan 18, 2022, at 18:33, David Li wrote:
> >> Ah, yes, thanks for the reminder. That's one of the things that needs
> >> to be addressed for sure.
> >>
> >> -David
> >>
> >> On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
> >>> One general observation. I think this implementation uses the polling
> to
> >>> check the progress. Because of the client server semantics of Arrow
> Flight,
> >>> you may need to use an interrupt based polling like epoll to avoid the
> busy
> >>> looping.
> >>>
> >>> Best,
> >>> Supun..
> >>>
> >>> On Tue, Jan 18, 2022 at 8:13 AM David Li <li...@apache.org> wrote:
> >>>
> >>> > Thanks for those results, Yibo! Looks like there's still more room
> for
> >>> > improvement here. Yes, things are a little unstable, though I didn't
> >>> > get that much trouble trying to just start the benchmark - I will
> need
> >>> > to find suitable hardware and iron out these issues. Note that I've
> >>> > only implemented DoGet, and I haven't implemented concurrent streams,
> >>> > which would explain why most benchmark configurations hang or error.
> >>> >
> >>> > Since the last time, I've rewritten the prototype to use UCX's
> "active
> >>> > message" functionality instead of trying to implement messages over
> >>> > the "streams" API. This simplified the code. I also did some
> >>> > refactoring along the lines of Yibo's prototype to share more code
> >>> > between the gRPC and UCX implementations. Here are some benchmark
> >>> > numbers:
> >>> >
> >>> > For IPC (server/client on the same machine): UCX with shared memory
> >>> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
> >>> >
> >>> > gRPC:
> >>> > 128KiB batches: 4463 MiB/s
> >>> > 2MiB batches:   3537 MiB/s
> >>> > 32MiB batches:  1828 MiB/s
> >>> >
> >>> > UCX (shared memory):
> >>> > 128KiB batches: 6500 MiB/s
> >>> > 2MiB batches:  13879 MiB/s
> >>> > 32MiB batches:  9045 MiB/s
> >>> >
> >>> > UCX (TCP):
> >>> > 128KiB batches: 1069 MiB/s
> >>> > 2MiB batches:   1735 MiB/s
> >>> > 32MiB batches:  1602 MiB/s
> >>> >
> >>> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
> >>> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
> >>> > bandwidth. This isn't really a scenario where UCX is expected to
> >>> > shine, however, UCX performs comparably to gRPC here.
> >>> >
> >>> > gRPC:
> >>> > 128 KiB batches: 554 MiB/s
> >>> > 2 MiB batches:   575 MiB/s
> >>> >
> >>> > UCX:
> >>> > 128 KiB batches: 546 MiB/s
> >>> > 2 MiB batches:   567 MiB/s
> >>> >
> >>> > Raw test logs can be found here:
> >>> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
> >>> >
> >>> > For IPC, the shared memory results are promising in that it could be
> >>> > feasible to expose a library purely over Flight without worrying
> about
> >>> > FFI bindings. Also, it seems results are roughly comparable to what
> >>> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
> >>> > performant shared memory transport and support for more exotic
> >>> > hardware.
> >>> >
> >>> > There's still much work to be done; at this point, I'd like to start
> >>> > implementing the rest of the Flight methods, fixing up the many TODOs
> >>> > scattered around, trying to refactor more things to share code
> between
> >>> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
> >>> > path for.
> >>> >
> >>> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
> >>> >
> >>> > -David
> >>> >
> >>> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> >>> > > Some updates.
> >>> > >
> >>> > > I tested David's UCX transport patch over 100Gb network. FlightRPC
> over
> >>> > > UCX/RDMA improves throughput about 50%, with lower and flat
> latency.
> >>> > > And I think there are chances to improve further. See test report
> [1].
> >>> > >
> >>> > > For the data plane approach, the PoC shared memory data plane also
> >>> > > introduces significantly performance boost. Details at [2].
> >>> > >
> >>> > > Glad to see there are big potentials to improve FlightRPC
> performance.
> >>> > >
> >>> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
> >>> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
> >>> > >
> >>> > > On 12/30/21 11:57 PM, David Li wrote:
> >>> > > > Ah, I see.
> >>> > > >
> >>> > > > I think both projects can proceed as well. At some point we will
> have
> >>> > to figure out how to merge them, but I think it's too early to see
> how
> >>> > exactly we will want to refactor things.
> >>> > > >
> >>> > > > I looked over the code and I don't have any important comments
> for
> >>> > now. Looking forward to reviewing when it's ready.
> >>> > > >
> >>> > > > -David
> >>> > > >
> >>> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> >>> > > >>
> >>> > > >>
> >>> > > >> On 12/29/21 11:03 PM, David Li wrote:
> >>> > > >>> Awesome, thanks for sharing this too!
> >>> > > >>>
> >>> > > >>> The refactoring you have with DataClientStream what I would
> like to
> >>> > do as well - I think much of the existing code can be adapted to be
> more
> >>> > transport-agnostic and then it will be easier to support new
> transports
> >>> > (whether data-only or for all methods).
> >>> > > >>>
> >>> > > >>> Where do you see the gaps between gRPC and this? I think what
> would
> >>> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://`
> URI 3)
> >>> > client sees the unfamiliar prefix and creates a new client for the
> DoGet
> >>> > call (it would have to do this anyways if, for instance, the
> GetFlightInfo
> >>> > call returned the address of a different server).
> >>> > > >>>
> >>> > > >>
> >>> > > >> I mean implementation details. Some unit test runs longer than
> >>> > expected
> >>> > > >> (data plane timeouts reading from an ended stream). Looks grpc
> stream
> >>> > > >> finish message is not correctly intercepted and forwarded to
> data
> >>> > plane.
> >>> > > >> I don't think it's big problem, just need some time to debug.
> >>> > > >>
> >>> > > >>> I also wonder how this stacks up to UCX's shared memory
> backend (I
> >>> > did not test this though).
> >>> > > >>>
> >>> > > >>
> >>> > > >> I implemented a shared memory data plane only to verify and
> >>> > consolidate
> >>> > > >> the data plane design, as it's the easiest (and useful) driver.
> I also
> >>> > > >> plan to implement a socket based data plane, not useful in
> practice,
> >>> > > >> only to make sure the design works ok across network. Then we
> can add
> >>> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it
> works
> >>> > on
> >>> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
> >>> > equipment).
> >>> > > >>
> >>> > > >>> I would like to be able to support entire new transports for
> certain
> >>> > cases (namely browser support - though perhaps one of the gRPC
> proxies
> >>> > would suffice there), but even in that case, we could make it so
> that a new
> >>> > transport only needs to implement the data plane methods. Only
> having to
> >>> > support the data plane methods would save significant implementation
> effort
> >>> > for all non-browser cases so I think it's a worthwhile approach.
> >>> > > >>>
> >>> > > >>
> >>> > > >> Thanks for being interest in this approach. My current plan is
> to
> >>> > first
> >>> > > >> refactor shared memory data plane to verify it beats grpc in
> local rpc
> >>> > > >> by considerable margin, otherwise there must be big mistakes in
> my
> >>> > > >> design. After that I will fix unit test issues and deliver for
> >>> > community
> >>> > > >> review.
> >>> > > >>
> >>> > > >> Anyway, don't let me block your implementations. And if you
> think it's
> >>> > > >> useful, I can push current code for more detailed discussion.
> >>> > > >>
> >>> > > >>> -David
> >>> > > >>>
> >>> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> >>> > > >>>> Thanks David to initiate UCX integration, great work!
> >>> > > >>>> I think 5Gbps network is too limited for performance
> evaluation. I
> >>> > will try the patch on 100Gb RDMA network, hopefully we can see some
> >>> > improvements.
> >>> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
> >>> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about
> 60us
> >>> > latency. I also benchmarked raw RDMA performance (same batch sizes as
> >>> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
> >>> > comparison is not fair. With David's patch, we can get a more
> realistic
> >>> > comparison.
> >>> > > >>>>
> >>> > > >>>> I'm implementing a data plane approach to hope we can adopt
> new
> >>> > data acceleration methods easily. My approach is to replace only the
> >>> > FlighData transmission of DoGet/Put/Exchange with data plane
> drivers, and
> >>> > grpc is still used for all rpc calls.
> >>> > > >>>> Code is at my github repo [2]. Besides the framework, I just
> >>> > implemented a shared memory data plane driver as PoC.
> Get/Put/Exchange unit
> >>> > tests passed, TestCancel hangs, some unit tests run longer than
> expected,
> >>> > still debugging. The shared memory data plane performance is pretty
> bad
> >>> > now, due to repeated map/unmap for each read/write, pre-allocated
> pages
> >>> > should improve much, still experimenting.
> >>> > > >>>>
> >>> > > >>>> Would like to hear community comments.
> >>> > > >>>>
> >>> > > >>>> My personal opinion is the data plane approach reuses grpc
> control
> >>> > plane, may be easier to add new data acceleration methods, but it
> needs to
> >>> > fit into grpc seamlessly (there're still gaps not resolved). A new
> tranport
> >>> > requires much more initial effort, but may payoff later. And looks
> these
> >>> > two approaches don't conflict with each other.
> >>> > > >>>>
> >>> > > >>>> [1] Test environment
> >>> > > >>>> nics: mellanox connectx5
> >>> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> >>> > > >>>> os: ubuntu 20.04, linux kernel 5.4
> >>> > > >>>> test case: 128k batch size, DoGet
> >>> > > >>>>
> >>> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> >>> > > >>>>
> >>> > > >>>> ________________________________
> >>> > > >>>> From: David Li <li...@apache.org>
> >>> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> >>> > > >>>> To: dev@arrow.apache.org <de...@arrow.apache.org>
> >>> > > >>>> Subject: Re: Arrow in HPC
> >>> > > >>>>
> >>> > > >>>> I ended up drafting an implementation of Flight based on UCX,
> and
> >>> > doing some
> >>> > > >>>> of the necessary refactoring to support additional backends
> in the
> >>> > future.
> >>> > > >>>> It can run the Flight benchmark, and performance is about
> >>> > comparable to
> >>> > > >>>> gRPC, as tested on AWS EC2.
> >>> > > >>>>
> >>> > > >>>> The implementation is based on the UCP streams API. It's
> extremely
> >>> > > >>>> bare-bones and is really only a proof of concept; a good
> amount of
> >>> > work is
> >>> > > >>>> needed to turn it into a usable implementation. I had hoped it
> >>> > would perform
> >>> > > >>>> markedly better than gRPC, at least in this early test, but
> this
> >>> > seems not
> >>> > > >>>> to be the case. That said: I am likely not using UCX
> properly, UCX
> >>> > would
> >>> > > >>>> still open up support for additional hardware, and this work
> should
> >>> > allow
> >>> > > >>>> other backends to be implemented more easily.
> >>> > > >>>>
> >>> > > >>>> The branch can be viewed at
> >>> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> >>> > > >>>>
> >>> > > >>>> I've attached the benchmark output at the end.
> >>> > > >>>>
> >>> > > >>>> There are still quite a few TODOs and things that need
> >>> > investigating:
> >>> > > >>>>
> >>> > > >>>> - Only DoGet and GetFlightInfo are implemented, and
> incompletely at
> >>> > that.
> >>> > > >>>> - Concurrent requests are not supported, or even making more
> than
> >>> > one
> >>> > > >>>>     request on a connection, nor does the server support
> concurrent
> >>> > clients.
> >>> > > >>>>     We also need to decide whether to even support concurrent
> >>> > requests, and
> >>> > > >>>>     how (e.g. pooling multiple connections, or implementing a
> >>> > gRPC/HTTP2 style
> >>> > > >>>>     protocol, or even possibly implementing HTTP2).
> >>> > > >>>> - We need to make sure we properly handle errors, etc.
> everywhere.
> >>> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will
> the
> >>> > > >>>>     implementation work well on RDMA and other specialized
> hardware?
> >>> > > >>>> - Do we also need to support the UCX tag API?
> >>> > > >>>> - Can we refactor out interfaces that allow sharing more of
> the
> >>> > > >>>>     client/server implementation between different backends?
> >>> > > >>>> - Are the abstractions sufficient to support other potential
> >>> > backends like
> >>> > > >>>>     MPI, libfabrics, or WebSockets?
> >>> > > >>>>
> >>> > > >>>> If anyone has experience with UCX, I'd appreciate any
> feedback.
> >>> > Otherwise,
> >>> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs
> above,
> >>> > and figure
> >>> > > >>>> out how this effort can proceed.
> >>> > > >>>>
> >>> > > >>>> Antoine/Micah raised the possibility of extending gRPC
> instead.
> >>> > That would
> >>> > > >>>> be preferable, frankly, given otherwise we'd might have to
> >>> > re-implement a
> >>> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
> >>> > necessary
> >>> > > >>>> proposal stalled and was dropped without much discussion:
> >>> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> >>> > > >>>>
> >>> > > >>>> Benchmark results (also uploaded at
> >>> > > >>>>
> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> >>> > > >>>>
> >>> > > >>>> Testing was done between two t3.xlarge instances in the same
> zone.
> >>> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> >>> > > >>>>
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=40960000
> >>> > -records_per_batch=4096
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 131072
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 1310720000
> >>> > > >>>> Nanos: 2165862969
> >>> > > >>>> Speed: 577.137 MB/s
> >>> > > >>>> Throughput: 4617.1 batches/s
> >>> > > >>>> Latency mean: 214 us
> >>> > > >>>> Latency quantile=0.5: 209 us
> >>> > > >>>> Latency quantile=0.95: 340 us
> >>> > > >>>> Latency quantile=0.99: 409 us
> >>> > > >>>> Latency max: 6350 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=655360000
> >>> > -records_per_batch=65536
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]
>  ucp_worker.c:1627
> >>> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 2097152
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 20971520000
> >>> > > >>>> Nanos: 34184175236
> >>> > > >>>> Speed: 585.066 MB/s
> >>> > > >>>> Throughput: 292.533 batches/s
> >>> > > >>>> Latency mean: 3415 us
> >>> > > >>>> Latency quantile=0.5: 3408 us
> >>> > > >>>> Latency quantile=0.95: 3549 us
> >>> > > >>>> Latency quantile=0.99: 3800 us
> >>> > > >>>> Latency max: 20236 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=40960000
> >>> > -records_per_batch=4096
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> Using standalone TCP server
> >>> > > >>>> Server host: 172.31.34.4
> >>> > > >>>> Server port: 31337
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 131072
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 1310720000
> >>> > > >>>> Nanos: 2375289668
> >>> > > >>>> Speed: 526.252 MB/s
> >>> > > >>>> Throughput: 4210.01 batches/s
> >>> > > >>>> Latency mean: 235 us
> >>> > > >>>> Latency quantile=0.5: 203 us
> >>> > > >>>> Latency quantile=0.95: 328 us
> >>> > > >>>> Latency quantile=0.99: 1377 us
> >>> > > >>>> Latency max: 17860 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env
> UCX_LOG_LEVEL=info
> >>> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> >>> > 172.31.34.4 -num_streams=1 -num_threads=1
> -records_per_stream=655360000
> >>> > -records_per_batch=65536
> >>> > > >>>> Testing method: DoGet
> >>> > > >>>> Using standalone TCP server
> >>> > > >>>> Server host: 172.31.34.4
> >>> > > >>>> Server port: 31337
> >>> > > >>>> Number of perf runs: 1
> >>> > > >>>> Number of concurrent gets/puts: 1
> >>> > > >>>> Batch size: 2097152
> >>> > > >>>> Batches read: 10000
> >>> > > >>>> Bytes read: 20971520000
> >>> > > >>>> Nanos: 34202704498
> >>> > > >>>> Speed: 584.749 MB/s
> >>> > > >>>> Throughput: 292.375 batches/s
> >>> > > >>>> Latency mean: 3416 us
> >>> > > >>>> Latency quantile=0.5: 3406 us
> >>> > > >>>> Latency quantile=0.95: 3548 us
> >>> > > >>>> Latency quantile=0.99: 3764 us
> >>> > > >>>> Latency max: 17086 us
> >>> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c
> 172.31.34.4
> >>> > -p 1337 -Z -l 1M
> >>> > > >>>> Connecting to host 172.31.34.4, port 1337
> >>> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4
> port
> >>> > 1337
> >>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> Cwnd
> >>> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36
>  2.35
> >>> > MBytes
> >>> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0
>  2.43
> >>> > MBytes
> >>> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> >>> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> >>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
> >>> >    sender
> >>> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
> >>> >   receiver
> >>> > > >>>>
> >>> > > >>>> iperf Done.
> >>> > > >>>>
> >>> > > >>>> Best,
> >>> > > >>>> David
> >>> > > >>>>
> >>> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> >>> > > >>>>> "David Li" <li...@apache.org> writes:
> >>> > > >>>>>
> >>> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
> >>> > results. Even if it is a very hacky PoC it will be interesting to
> see how
> >>> > it affects performance, though as Keith points out there are
> benefits in
> >>> > general to UCX (or similar library), and we can work out the
> implementation
> >>> > plan from there.
> >>> > > >>>>>>
> >>> > > >>>>>> To Benson's point - the work done to get UCX supported
> would pave
> >>> > the way to supporting other backends as well. I'm personally not
> familiar
> >>> > with UCX, MPI, etc. so is MPI here more about playing well with
> established
> >>> > practices or does it also offer potential hardware
> support/performance
> >>> > improvements like UCX would?
> >>> > > >>>>>
> >>> > > >>>>> There are two main implementations of MPI, MPICH and Open
> MPI,
> >>> > both of which are permissively licensed open source community
> projects.
> >>> > Both have direct support for UCX and unless your needs are very
> specific,
> >>> > the overhead of going through MPI is likely to be negligible. Both
> also
> >>> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
> >>> > Spectrum MPI (Open MPI derivative), which may have optimizations for
> >>> > proprietary networks. Both MPICH and Open MPI can be built without
> UCX, and
> >>> > this is often easier (UCX 'master' is more volatile in my
> experience).
> >>> > > >>>>>
> >>> > > >>>>> The vast majority of distributed memory scientific
> applications
> >>> > use MPI or higher level libraries, rather than writing directly to
> UCX
> >>> > (which provides less coverage of HPC networks). I think MPI
> compatibility
> >>> > is important.
> >>> > > >>>>>
> >>> > > >>>>>   From way up-thread (sorry):
> >>> > > >>>>>
> >>> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
> >>> > another
> >>> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not
> familiar with
> >>> > the HPC
> >>> > > >>>>>>>>>>>> space.
> >>> > > >>>>>
> >>> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
> >>> > reduction and give every process the result; these run in log(P) or
> better
> >>> > time with small constants -- 15 microseconds is typical for a cheap
> >>> > reduction operation on a million processes). MPI supports
> user-defined
> >>> > operations for reductions and prefix-scan operations. If we defined
> MPI_Ops
> >>> > for Arrow types, we could compute summary statistics and other
> algorithmic
> >>> > building blocks fast at arbitrary scale.
> >>> > > >>>>>
> >>> > > >>>>> The collective execution model might not be everyone's bag,
> but
> >>> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
> >>> > MPI_Fetch_and_op) and dropping into collective mode has big
> advantages for
> >>> > certain algorithms in computational statistics/machine learning.
> >>> > > >>>>>
> >>> > > >>>> IMPORTANT NOTICE: The contents of this email and any
> attachments
> >>> > are confidential and may also be privileged. If you are not the
> intended
> >>> > recipient, please notify the sender immediately and do not disclose
> the
> >>> > contents to any other person, use it for any purpose, or store or
> copy the
> >>> > information in any medium. Thank you.
> >>> > > >>>>
> >>> > > >>>
> >>> > > >>
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>>
> >>> --
> >>> Supun Kamburugamuve
> >>>
>