You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/12/27 20:56:10 UTC

[arrow] branch master updated: ARROW-18353: [C++][FlightRPC] Prevent concurrent Finish in UCX (#15034)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ceb2479d44 ARROW-18353: [C++][FlightRPC] Prevent concurrent Finish in UCX (#15034)
ceb2479d44 is described below

commit ceb2479d4486702f7faed5f26708822125939688
Author: David Li <li...@gmail.com>
AuthorDate: Tue Dec 27 15:56:04 2022 -0500

    ARROW-18353: [C++][FlightRPC] Prevent concurrent Finish in UCX (#15034)
    
    The reader and writer can be used concurrently with each other,
    and both may try to close the underlying stream when an error
    happens. This can cause one side to stomp on the other's state
    inadvertently. When that happens, we may or may not get a crash
    or trigger an assertion - but sometimes things just happen to work
    out, and the client exits without proper cleanup. The server,
    however, doesn't realize this, and so it gets stuck waiting for
    the client, preventing the test from finishing.
    
    The longer-term fix would ideally be to let the server forcefully
    terminate instead of waiting for clients. (Incidentally, this
    problem also affects the gRPC transport, though gRPC is better
    at detecting when the client has disappeared.) Here, we add more
    assertions and also properly use a lock while doing cleanup.
    
    Requires #15031.
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/flight/transport/ucx/ucx_client.cc | 23 +++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc
index 21bc7087ff..d11adb54ad 100644
--- a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc
+++ b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc
@@ -125,7 +125,7 @@ class ClientConnection {
       RETURN_NOT_OK(FromUcsStatus("ucp_ep_create", status));
     }
 
-    driver_.reset(new UcpCallDriver(ucp_worker_, remote_endpoint_));
+    driver_ = std::make_unique<UcpCallDriver>(ucp_worker_, remote_endpoint_);
     ARROW_LOG(DEBUG) << "Connected to " << driver_->peer();
 
     {
@@ -187,11 +187,15 @@ class UcxClientStream : public internal::ClientDataStream {
         conn_(std::move(conn)),
         driver_(conn_.driver()),
         writes_done_(false),
-        finished_(false) {}
+        finished_(false) {
+    DCHECK_NE(impl, nullptr);
+    DCHECK_NE(conn_.driver(), nullptr);
+  }
 
  protected:
   Status DoFinish() override;
 
+  std::mutex finish_mutex_;
   UcxClientImpl* impl_;
   ClientConnection conn_;
   UcpCallDriver* driver_;
@@ -509,9 +513,9 @@ class ExchangeClientStream : public WriteClientStream {
 
 class UcxClientImpl : public arrow::flight::internal::ClientTransport {
  public:
-  UcxClientImpl() {}
+  UcxClientImpl() = default;
 
-  virtual ~UcxClientImpl() {
+  ~UcxClientImpl() override {
     if (!ucp_context_) return;
     ARROW_WARN_NOT_OK(Close(), "UcxClientImpl errored in Close() in destructor");
   }
@@ -557,8 +561,9 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {
   Status Close() override {
     std::unique_lock<std::mutex> connections_mutex_;
     while (!connections_.empty()) {
-      RETURN_NOT_OK(connections_.front().Close());
+      ClientConnection conn = std::move(connections_.front());
       connections_.pop_front();
+      RETURN_NOT_OK(conn.Close());
     }
     return Status::OK();
   }
@@ -650,6 +655,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {
   Status MakeConnection() {
     ClientConnection conn;
     RETURN_NOT_OK(conn.Init(ucp_context_, uri_));
+    std::unique_lock<std::mutex> connections_mutex_;
     connections_.push_back(std::move(conn));
     return Status::OK();
   }
@@ -658,10 +664,10 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {
     std::unique_lock<std::mutex> connections_mutex_;
     if (connections_.empty()) RETURN_NOT_OK(MakeConnection());
     ClientConnection conn = std::move(connections_.front());
+    connections_.pop_front();
     conn.driver()->set_memory_manager(options.memory_manager);
     conn.driver()->set_read_memory_pool(options.read_options.memory_pool);
     conn.driver()->set_write_memory_pool(options.write_options.memory_pool);
-    connections_.pop_front();
     return conn;
   }
 
@@ -675,6 +681,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {
       RETURN_NOT_OK(conn.Close());
       return Status::OK();
     }
+    DCHECK_NE(conn.driver(), nullptr);
     connections_.push_back(std::move(conn));
     return Status::OK();
   }
@@ -690,6 +697,9 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {
 
 Status UcxClientStream::DoFinish() {
   RETURN_NOT_OK(WritesDone());
+  // Both reader and writer may be used concurrently, and both may
+  // call Finish() - prevent concurrent state mutation
+  std::lock_guard<std::mutex> guard(finish_mutex_);
   if (!finished_) {
     internal::FlightData message;
     std::shared_ptr<Buffer> metadata;
@@ -700,6 +710,7 @@ Status UcxClientStream::DoFinish() {
     finished_ = true;
   }
   if (impl_) {
+    DCHECK_NE(conn_.driver(), nullptr);
     auto status = impl_->ReturnConnection(std::move(conn_));
     impl_ = nullptr;
     driver_ = nullptr;