You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/09/22 07:44:40 UTC

[arrow] branch master updated: ARROW-10013: [FlightRPC][C++] fix setting generic client options

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c47b58a  ARROW-10013: [FlightRPC][C++] fix setting generic client options
c47b58a is described below

commit c47b58a7c0e16de6e70c081185f7d326a567a599
Author: David Li <li...@gmail.com>
AuthorDate: Tue Sep 22 09:43:49 2020 +0200

    ARROW-10013: [FlightRPC][C++] fix setting generic client options
    
    Some gRPC version upgrade meant that for settings that were present twice, the first value was taken instead of the last value. This changes how we set default options to avoid this issue.
    
    Closes #8196 from lidavidm/arrow-10013
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/flight/client.cc      | 16 ++++++++++++----
 cpp/src/arrow/flight/flight_test.cc |  2 +-
 python/pyarrow/_flight.pyx          |  3 ++-
 python/pyarrow/tests/test_flight.py |  2 ++
 4 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index 8181ae7..b25b13e 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -25,6 +25,7 @@
 #include <mutex>
 #include <sstream>
 #include <string>
+#include <unordered_map>
 #include <utility>
 
 #ifdef GRPCPP_PP_INCLUDE
@@ -867,13 +868,17 @@ class FlightClient::FlightClientImpl {
     }
 
     grpc::ChannelArguments args;
+    // We can't set the same config value twice, so for values where
+    // we want to set defaults, keep them in a map and update them;
+    // then update them all at once
+    std::unordered_map<std::string, int> default_args;
     // Try to reconnect quickly at first, in case the server is still starting up
-    args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
+    default_args[GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = 100;
     // Receive messages of any size
-    args.SetMaxReceiveMessageSize(-1);
+    default_args[GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = -1;
     // Setting this arg enables each client to open it's own TCP connection to server,
     // not sharing one single connection, which becomes bottleneck under high load.
-    args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
+    default_args[GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL] = 1;
 
     if (options.override_hostname != "") {
       args.SetSslTargetNameOverride(options.override_hostname);
@@ -882,12 +887,15 @@ class FlightClient::FlightClientImpl {
     // Allow setting generic gRPC options.
     for (const auto& arg : options.generic_options) {
       if (util::holds_alternative<int>(arg.second)) {
-        args.SetInt(arg.first, util::get<int>(arg.second));
+        default_args[arg.first] = util::get<int>(arg.second);
       } else if (util::holds_alternative<std::string>(arg.second)) {
         args.SetString(arg.first, util::get<std::string>(arg.second));
       }
       // Otherwise unimplemented
     }
+    for (const auto& pair : default_args) {
+      args.SetInt(pair.first, pair.second);
+    }
 
     std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
         interceptors;
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index cb88d85..8726a55 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -1417,7 +1417,7 @@ TEST_F(TestFlightClient, GenericOptions) {
   std::unique_ptr<FlightClient> client;
   auto options = FlightClientOptions::Defaults();
   // Set a very low limit at the gRPC layer to fail all calls
-  options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 32);
+  options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 4);
   Location location;
   ASSERT_OK(Location::ForGrpcTcp("localhost", server_->port(), &location));
   ASSERT_OK(FlightClient::Connect(location, options, &client));
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index b7459d2..484fbb2 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -1435,7 +1435,8 @@ cdef class ServerCallContext(_Weakrefable):
 
     def peer(self):
         """Get the address of the peer."""
-        return frombytes(self.context.peer())
+        # Set safe=True as gRPC on Windows sometimes gives garbage bytes
+        return frombytes(self.context.peer(), safe=True)
 
     def get_middleware(self, key):
         """
diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py
index 50e993d..5d0e1c0 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -952,6 +952,8 @@ def test_http_basic_unauth():
             list(client.do_action(action))
 
 
+@pytest.mark.skipif(os.name == 'nt',
+                    reason="ARROW-10013: gRPC on Windows corrupts peer()")
 def test_http_basic_auth():
     """Test a Python implementation of HTTP basic authentication."""
     with EchoStreamFlightServer(auth_handler=basic_auth_handler) as server: