You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/19 14:00:11 UTC

[arrow] branch master updated: ARROW-5533: [C++] [Plasma] make plasma client thread safe

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 726f90f  ARROW-5533: [C++] [Plasma] make plasma client thread safe
726f90f is described below

commit 726f90f982886b55381414a537baf3c37af0b32f
Author: Zhijun Fu <pi...@antfin.com>
AuthorDate: Wed Jun 19 08:59:59 2019 -0500

    ARROW-5533: [C++] [Plasma] make plasma client thread safe
    
    Make plasma client thread safe, so it's guaranteed when the PlasmaBuffer for an object destructs, the call to plasma_client.Release(object_id) is properly protected.  Refer to [here](https://github.com/ray-project/ray/pull/4922#discussion_r291422782) for background.
    
    @robertnishihara @pcmoritz
    
    Author: Zhijun Fu <pi...@antfin.com>
    
    Closes #4503 from zhijunfu/plasma-client-mutex and squashes the following commits:
    
    ee2575a21 <Zhijun Fu> fix merge
    217c31d77 <Zhijun Fu> Merge remote-tracking branch 'origin/plasma-client-mutex' into plasma-client-mutex
    7ea924a9d <Zhijun Fu> fix unintended changes
    c82ad9367 <Zhijun Fu> change mutex to recursive_mutex
    ed7392c2f <Zhijun Fu> add lock to plasma client
    6249e98d3 <Zhijun Fu> change mutex to recursive_mutex
    54bbdf4a0 <Zhijun Fu> Merge branch 'master' of https://github.com/apache/arrow into plasma-client-mutex
    afdbe688e <Zhijun Fu> add lock to plasma client
---
 cpp/src/plasma/client.cc | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 9447e5d..ce9795d 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -301,6 +301,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
   int64_t store_capacity_;
   /// A hash set to record the ids that users want to delete but still in use.
   std::unordered_set<ObjectID> deletion_cache_;
+  /// A mutex which protects this class.
+  std::recursive_mutex client_mutex_;
 
 #ifdef PLASMA_CUDA
   /// Cuda Device Manager.
@@ -341,6 +343,8 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
 }
 
 bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   const auto elem = objects_in_use_.find(object_id);
   return (elem != objects_in_use_.end());
 }
@@ -384,6 +388,8 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
 Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
                                   const uint8_t* metadata, int64_t metadata_size,
                                   std::shared_ptr<Buffer>* data, int device_num) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
                    << data_size << " and metadata size " << metadata_size;
   RETURN_NOT_OK(
@@ -451,8 +457,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
 Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
                                          const std::string& data,
                                          const std::string& metadata) {
-  ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
 
+  ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
   // Compute the object hash.
   static unsigned char digest[kDigestSize];
   // CreateAndSeal currently only supports device_num = 0, which corresponds to
@@ -608,6 +615,8 @@ Status PlasmaClient::Impl::GetBuffers(
 
 Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
                                int64_t timeout_ms, std::vector<ObjectBuffer>* out) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   const auto wrap_buffer = [=](const ObjectID& object_id,
                                const std::shared_ptr<Buffer>& buffer) {
     return std::make_shared<PlasmaBuffer>(shared_from_this(), object_id, buffer);
@@ -619,6 +628,8 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
 
 Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
                                int64_t timeout_ms, ObjectBuffer* out) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   const auto wrap_buffer = [](const ObjectID& object_id,
                               const std::shared_ptr<Buffer>& buffer) { return buffer; };
   return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
@@ -635,6 +646,8 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
 }
 
 Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   // If the client is already disconnected, ignore release requests.
   if (store_conn_ < 0) {
     return Status::OK();
@@ -672,6 +685,8 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
 
 // This method is used to query whether the plasma store contains an object.
 Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   // Check if we already have a reference to the object.
   if (objects_in_use_.count(object_id) > 0) {
     *has_object = 1;
@@ -690,6 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
 }
 
 Status PlasmaClient::Impl::List(ObjectTable* objects) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
   RETURN_NOT_OK(SendListRequest(store_conn_));
   std::vector<uint8_t> buffer;
   RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
@@ -768,6 +784,8 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data
 }
 
 Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   // Make sure this client has a reference to the object before sending the
   // request to Plasma.
   auto object_entry = objects_in_use_.find(object_id);
@@ -794,6 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
 }
 
 Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
   auto object_entry = objects_in_use_.find(object_id);
   ARROW_CHECK(object_entry != objects_in_use_.end())
       << "Plasma client called abort on an object without a reference to it";
@@ -832,6 +851,8 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
 }
 
 Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   std::vector<ObjectID> not_in_use_ids;
   for (auto& object_id : object_ids) {
     // If the object is in used, skip it.
@@ -855,6 +876,8 @@ Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
 }
 
 Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   // Send a request to the store to evict objects.
   RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
   // Wait for a response with the number of bytes actually evicted.
@@ -865,6 +888,8 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
 }
 
 Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   // Get the plasma object data. We pass in a timeout of 0 to indicate that
   // the operation should timeout immediately.
   std::vector<ObjectBuffer> object_buffers;
@@ -880,6 +905,8 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
 }
 
 Status PlasmaClient::Impl::Subscribe(int* fd) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   int sock[2];
   // Create a non-blocking socket pair. This will only be used to send
   // notifications from the Plasma store to the client.
@@ -902,6 +929,8 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
 Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
                                               int64_t* data_size,
                                               int64_t* metadata_size) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
   ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
   memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
@@ -917,6 +946,8 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o
 
 Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
                                            int64_t* data_size, int64_t* metadata_size) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   auto notification = ReadMessageAsync(fd);
   if (notification == NULL) {
     return Status::IOError("Failed to read object notification from Plasma socket");
@@ -927,6 +958,8 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
 Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
                                    const std::string& manager_socket_name,
                                    int release_delay, int num_retries) {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
   if (manager_socket_name != "") {
     return Status::NotImplemented("plasma manager is no longer supported");
@@ -944,6 +977,8 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
 }
 
 Status PlasmaClient::Impl::Disconnect() {
+  std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
   // NOTE: We purposefully do not finish sending release calls for objects in
   // use, so that we don't duplicate PlasmaClient::Release calls (when handling
   // a SIGTERM, for example).