You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/03/13 01:22:53 UTC

[arrow] branch master updated: ARROW-4421: [C++][Flight] Handle large RPC messages in Flight

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new bd0dfe5  ARROW-4421: [C++][Flight] Handle large RPC messages in Flight
bd0dfe5 is described below

commit bd0dfe5d2760fcefdc8bba096d50335b166a3001
Author: David Li <Da...@twosigma.com>
AuthorDate: Tue Mar 12 20:22:44 2019 -0500

    ARROW-4421: [C++][Flight] Handle large RPC messages in Flight
    
    gRPC has a default maximum message size of 4MB
    
    Author: David Li <Da...@twosigma.com>
    
    Closes #3878 from lihalite/arrow-4421 and squashes the following commits:
    
    a4efbbd17 <David Li> Accept messages of any size in C++/Python Flight server
    0d8c8ccb6 <David Li> Accept messages of any size in C++/Python Flight client
---
 cpp/src/arrow/flight/client.cc      |  2 ++
 cpp/src/arrow/flight/server.cc      |  2 ++
 python/pyarrow/tests/test_flight.py | 36 ++++++++++++++++++++++++++++++++++++
 3 files changed, 40 insertions(+)

diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index 50c70ee..f3420c4 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -198,6 +198,8 @@ class FlightClient::FlightClientImpl {
     grpc::ChannelArguments args;
     // Try to reconnect quickly at first, in case the server is still starting up
     args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
+    // Receive messages of any size
+    args.SetMaxReceiveMessageSize(-1);
     stub_ = pb::FlightService::NewStub(
         grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), args));
     return Status::OK();
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 3628821..cc1c03d 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -352,6 +352,8 @@ Status FlightServerBase::Init(int port) {
   impl_->service_.reset(new FlightServiceImpl(this));
 
   grpc::ServerBuilder builder;
+  // Allow uploading messages of any length
+  builder.SetMaxReceiveMessageSize(-1);
   builder.AddListeningPort(impl_->address_, grpc::InsecureServerCredentials());
   builder.RegisterService(impl_->service_.get());
 
diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py
index d225f77..b1b6a12 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -43,6 +43,20 @@ class ConstantFlightServer(flight.FlightServerBase):
         return flight.RecordBatchStream(table)
 
 
+class EchoFlightServer(flight.FlightServerBase):
+    """A Flight server that returns the last data uploaded."""
+
+    def __init__(self):
+        super(EchoFlightServer, self).__init__()
+        self.last_message = None
+
+    def do_get(self, ticket):
+        return flight.RecordBatchStream(self.last_message)
+
+    def do_put(self, descriptor, reader):
+        self.last_message = reader.read_all()
+
+
 @contextlib.contextmanager
 def flight_server(server_base, *args, **kwargs):
     """Spawn a Flight server on a free port, shutting it down when done."""
@@ -78,3 +92,25 @@ def test_flight_do_get():
         client = flight.FlightClient.connect('localhost', server_port)
         data = client.do_get(flight.Ticket(b''), table.schema).read_all()
         assert data.equals(table)
+
+
+@pytest.mark.slow
+def test_flight_large_message():
+    """Try sending/receiving a large message via Flight.
+
+    See ARROW-4421: by default, gRPC won't allow us to send messages >
+    4MiB in size.
+    """
+    data = pa.Table.from_arrays([
+        pa.array(range(0, 10 * 1024 * 1024))
+    ], names=['a'])
+
+    with flight_server(EchoFlightServer) as server_port:
+        client = flight.FlightClient.connect('localhost', server_port)
+        writer = client.do_put(flight.FlightDescriptor.for_path('test'),
+                               data.schema)
+        # Write a single giant chunk
+        writer.write_table(data, 10 * 1024 * 1024)
+        writer.close()
+        result = client.do_get(flight.Ticket(b''), data.schema).read_all()
+        assert result.equals(data)