You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/19 14:00:11 UTC
[arrow] branch master updated: ARROW-5533: [C++] [Plasma] make
plasma client thread safe
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 726f90f ARROW-5533: [C++] [Plasma] make plasma client thread safe
726f90f is described below
commit 726f90f982886b55381414a537baf3c37af0b32f
Author: Zhijun Fu <pi...@antfin.com>
AuthorDate: Wed Jun 19 08:59:59 2019 -0500
ARROW-5533: [C++] [Plasma] make plasma client thread safe
Make plasma client thread safe, so it's guaranteed when the PlasmaBuffer for an object destructs, the call to plasma_client.Release(object_id) is properly protected. Refer to [here](https://github.com/ray-project/ray/pull/4922#discussion_r291422782) for background.
@robertnishihara @pcmoritz
Author: Zhijun Fu <pi...@antfin.com>
Closes #4503 from zhijunfu/plasma-client-mutex and squashes the following commits:
ee2575a21 <Zhijun Fu> fix merge
217c31d77 <Zhijun Fu> Merge remote-tracking branch 'origin/plasma-client-mutex' into plasma-client-mutex
7ea924a9d <Zhijun Fu> fix unintended changes
c82ad9367 <Zhijun Fu> change mutex to recursive_mutex
ed7392c2f <Zhijun Fu> add lock to plasma client
6249e98d3 <Zhijun Fu> change mutex to recursive_mutex
54bbdf4a0 <Zhijun Fu> Merge branch 'master' of https://github.com/apache/arrow into plasma-client-mutex
afdbe688e <Zhijun Fu> add lock to plasma client
---
cpp/src/plasma/client.cc | 37 ++++++++++++++++++++++++++++++++++++-
1 file changed, 36 insertions(+), 1 deletion(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 9447e5d..ce9795d 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -301,6 +301,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
int64_t store_capacity_;
/// A hash set to record the ids that users want to delete but still in use.
std::unordered_set<ObjectID> deletion_cache_;
+ /// A mutex which protects this class.
+ std::recursive_mutex client_mutex_;
#ifdef PLASMA_CUDA
/// Cuda Device Manager.
@@ -341,6 +343,8 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
}
bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
const auto elem = objects_in_use_.find(object_id);
return (elem != objects_in_use_.end());
}
@@ -384,6 +388,8 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
RETURN_NOT_OK(
@@ -451,8 +457,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
const std::string& data,
const std::string& metadata) {
- ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+ ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
// Compute the object hash.
static unsigned char digest[kDigestSize];
// CreateAndSeal currently only supports device_num = 0, which corresponds to
@@ -608,6 +615,8 @@ Status PlasmaClient::Impl::GetBuffers(
Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
int64_t timeout_ms, std::vector<ObjectBuffer>* out) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
const auto wrap_buffer = [=](const ObjectID& object_id,
const std::shared_ptr<Buffer>& buffer) {
return std::make_shared<PlasmaBuffer>(shared_from_this(), object_id, buffer);
@@ -619,6 +628,8 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
int64_t timeout_ms, ObjectBuffer* out) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
const auto wrap_buffer = [](const ObjectID& object_id,
const std::shared_ptr<Buffer>& buffer) { return buffer; };
return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
@@ -635,6 +646,8 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
}
Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
@@ -672,6 +685,8 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
// This method is used to query whether the plasma store contains an object.
Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
// Check if we already have a reference to the object.
if (objects_in_use_.count(object_id) > 0) {
*has_object = 1;
@@ -690,6 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
}
Status PlasmaClient::Impl::List(ObjectTable* objects) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
RETURN_NOT_OK(SendListRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
@@ -768,6 +784,8 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data
}
Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
// Make sure this client has a reference to the object before sending the
// request to Plasma.
auto object_entry = objects_in_use_.find(object_id);
@@ -794,6 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
}
Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end())
<< "Plasma client called abort on an object without a reference to it";
@@ -832,6 +851,8 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
}
Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
std::vector<ObjectID> not_in_use_ids;
for (auto& object_id : object_ids) {
// If the object is in used, skip it.
@@ -855,6 +876,8 @@ Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
}
Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
// Send a request to the store to evict objects.
RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
// Wait for a response with the number of bytes actually evicted.
@@ -865,6 +888,8 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
}
Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
// Get the plasma object data. We pass in a timeout of 0 to indicate that
// the operation should timeout immediately.
std::vector<ObjectBuffer> object_buffers;
@@ -880,6 +905,8 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
}
Status PlasmaClient::Impl::Subscribe(int* fd) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
int sock[2];
// Create a non-blocking socket pair. This will only be used to send
// notifications from the Plasma store to the client.
@@ -902,6 +929,8 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
int64_t* data_size,
int64_t* metadata_size) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
@@ -917,6 +946,8 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o
Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
auto notification = ReadMessageAsync(fd);
if (notification == NULL) {
return Status::IOError("Failed to read object notification from Plasma socket");
@@ -927,6 +958,8 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
const std::string& manager_socket_name,
int release_delay, int num_retries) {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
if (manager_socket_name != "") {
return Status::NotImplemented("plasma manager is no longer supported");
@@ -944,6 +977,8 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
}
Status PlasmaClient::Impl::Disconnect() {
+ std::lock_guard<std::recursive_mutex> guard(client_mutex_);
+
// NOTE: We purposefully do not finish sending release calls for objects in
// use, so that we don't duplicate PlasmaClient::Release calls (when handling
// a SIGTERM, for example).