You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2023/01/03 12:31:28 UTC
[arrow] branch master updated: GH-15150: [C++][FlightRPC] Wait for side effects in DoAction (#15152)
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f4ed8185eb GH-15150: [C++][FlightRPC] Wait for side effects in DoAction (#15152)
f4ed8185eb is described below
commit f4ed8185ebc1804092de46e58078414910587958
Author: David Li <li...@gmail.com>
AuthorDate: Tue Jan 3 07:31:22 2023 -0500
GH-15150: [C++][FlightRPC] Wait for side effects in DoAction (#15152)
* Closes: #15150
Authored-by: David Li <li...@gmail.com>
Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
cpp/src/arrow/flight/flight_test.cc | 29 +++++++++++++++++++++-
cpp/src/arrow/flight/transport/grpc/grpc_client.cc | 2 ++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index c2530360ab..502a789840 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -1469,8 +1469,12 @@ class CancelTestServer : public FlightServerBase {
*listings = std::make_unique<ForeverFlightListing>();
return Status::OK();
}
- Status DoAction(const ServerCallContext&, const Action&,
+ Status DoAction(const ServerCallContext&, const Action& action,
std::unique_ptr<ResultStream>* result) override {
+ if (action.type == "inc") {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ counter_++;
+ }
*result = std::make_unique<ForeverResultStream>();
return Status::OK();
}
@@ -1484,6 +1488,11 @@ class CancelTestServer : public FlightServerBase {
*data_stream = std::make_unique<ForeverDataStream>();
return Status::OK();
}
+
+ int64_t CheckCounter() const { return counter_; }
+
+ private:
+ std::atomic<int64_t> counter_ = 0;
};
class TestCancel : public ::testing::Test {
@@ -1497,6 +1506,9 @@ class TestCancel : public ::testing::Test {
ASSERT_OK(client_->Close());
ASSERT_OK(server_->Shutdown());
}
+ CancelTestServer* Server() const {
+ return static_cast<CancelTestServer*>(server_.get());
+ }
protected:
std::unique_ptr<FlightClient> client_;
@@ -1524,6 +1536,21 @@ TEST_F(TestCancel, DoAction) {
stream->Next());
}
+TEST_F(TestCancel, DoActionSideEffect) {
+ // GH-15150: DoAction should at least wait for the server to begin
+ // the response, since existing code may be using DoAction solely
+ // for the side effect.
+ ASSERT_EQ(0, Server()->CheckCounter());
+ StopSource stop_source;
+ FlightCallOptions options;
+ options.stop_token = stop_source.token();
+ // Will block for a bit, but not forever
+ ASSERT_OK_AND_ASSIGN(auto stream, client_->DoAction(options, {"inc", nullptr}));
+ // Side effect should have happened
+ ASSERT_EQ(1, Server()->CheckCounter());
+ stop_source.RequestStop(Status::Cancelled("StopSource"));
+}
+
TEST_F(TestCancel, ListActions) {
StopSource stop_source;
FlightCallOptions options;
diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
index e694e2b4c2..8ebd7cfcd9 100644
--- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
+++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
@@ -529,6 +529,8 @@ class GrpcResultStream : public ResultStream {
RETURN_NOT_OK(internal::ToProto(action, &pb_action));
RETURN_NOT_OK(rpc_.SetToken(auth_handler));
stream_ = stub->DoAction(&rpc_.context, pb_action);
+ // GH-15150: wait for initial metadata to allow some side effects to occur
+ stream_->WaitForInitialMetadata();
return Status::OK();
}