You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2023/01/03 12:31:28 UTC

[arrow] branch master updated: GH-15150: [C++][FlightRPC] Wait for side effects in DoAction (#15152)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ed8185eb GH-15150: [C++][FlightRPC] Wait for side effects in DoAction (#15152)
f4ed8185eb is described below

commit f4ed8185ebc1804092de46e58078414910587958
Author: David Li <li...@gmail.com>
AuthorDate: Tue Jan 3 07:31:22 2023 -0500

    GH-15150: [C++][FlightRPC] Wait for side effects in DoAction (#15152)
    
    
    * Closes: #15150
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
 cpp/src/arrow/flight/flight_test.cc                | 29 +++++++++++++++++++++-
 cpp/src/arrow/flight/transport/grpc/grpc_client.cc |  2 ++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index c2530360ab..502a789840 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -1469,8 +1469,12 @@ class CancelTestServer : public FlightServerBase {
     *listings = std::make_unique<ForeverFlightListing>();
     return Status::OK();
   }
-  Status DoAction(const ServerCallContext&, const Action&,
+  Status DoAction(const ServerCallContext&, const Action& action,
                   std::unique_ptr<ResultStream>* result) override {
+    if (action.type == "inc") {
+      std::this_thread::sleep_for(std::chrono::milliseconds(100));
+      counter_++;
+    }
     *result = std::make_unique<ForeverResultStream>();
     return Status::OK();
   }
@@ -1484,6 +1488,11 @@ class CancelTestServer : public FlightServerBase {
     *data_stream = std::make_unique<ForeverDataStream>();
     return Status::OK();
   }
+
+  int64_t CheckCounter() const { return counter_; }
+
+ private:
+  std::atomic<int64_t> counter_ = 0;
 };
 
 class TestCancel : public ::testing::Test {
@@ -1497,6 +1506,9 @@ class TestCancel : public ::testing::Test {
     ASSERT_OK(client_->Close());
     ASSERT_OK(server_->Shutdown());
   }
+  CancelTestServer* Server() const {
+    return static_cast<CancelTestServer*>(server_.get());
+  }
 
  protected:
   std::unique_ptr<FlightClient> client_;
@@ -1524,6 +1536,21 @@ TEST_F(TestCancel, DoAction) {
                                   stream->Next());
 }
 
+TEST_F(TestCancel, DoActionSideEffect) {
+  // GH-15150: DoAction should at least wait for the server to begin
+  // the response, since existing code may be using DoAction solely
+  // for the side effect.
+  ASSERT_EQ(0, Server()->CheckCounter());
+  StopSource stop_source;
+  FlightCallOptions options;
+  options.stop_token = stop_source.token();
+  // Will block for a bit, but not forever
+  ASSERT_OK_AND_ASSIGN(auto stream, client_->DoAction(options, {"inc", nullptr}));
+  // Side effect should have happened
+  ASSERT_EQ(1, Server()->CheckCounter());
+  stop_source.RequestStop(Status::Cancelled("StopSource"));
+}
+
 TEST_F(TestCancel, ListActions) {
   StopSource stop_source;
   FlightCallOptions options;
diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
index e694e2b4c2..8ebd7cfcd9 100644
--- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
+++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
@@ -529,6 +529,8 @@ class GrpcResultStream : public ResultStream {
     RETURN_NOT_OK(internal::ToProto(action, &pb_action));
     RETURN_NOT_OK(rpc_.SetToken(auth_handler));
     stream_ = stub->DoAction(&rpc_.context, pb_action);
+    // GH-15150: wait for initial metadata to allow some side effects to occur
+    stream_->WaitForInitialMetadata();
     return Status::OK();
   }