You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/06/04 13:40:40 UTC
[arrow] branch master updated: ARROW-5285: [C++][Plasma] Implement
to release GpuProcessHandle
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 135b5e3 ARROW-5285: [C++][Plasma] Implement to release GpuProcessHandle
135b5e3 is described below
commit 135b5e35380015adbcfdcd939683d7bc9603fda5
Author: shengjun.li <sh...@zilliz.com>
AuthorDate: Tue Jun 4 15:40:30 2019 +0200
ARROW-5285: [C++][Plasma] Implement to release GpuProcessHandle
Author: shengjun.li <sh...@zilliz.com>
Closes #4277 from shengjun1985/gpu and squashes the following commits:
7207ed769 <shengjun.li> ARROW-5285: implement to release GpuProcessHandle
---
cpp/src/plasma/client.cc | 40 +++++++++++++++++++++++++++++++++----
cpp/src/plasma/test/client_tests.cc | 37 ++++++++++++++++++++++++++++++++++
2 files changed, 73 insertions(+), 4 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e88c5ca..9447e5d 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -416,12 +416,15 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
}
} else {
#ifdef PLASMA_CUDA
- std::lock_guard<std::mutex> lock(gpu_mutex);
std::shared_ptr<CudaContext> context;
RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context));
GpuProcessHandle* handle = new GpuProcessHandle();
+ handle->client_count = 2;
RETURN_NOT_OK(context->OpenIpcBuffer(*object.ipc_handle, &handle->ptr));
- gpu_object_map[object_id] = handle;
+ {
+ std::lock_guard<std::mutex> lock(gpu_mutex);
+ gpu_object_map[object_id] = handle;
+ }
*data = handle->ptr;
if (metadata != NULL) {
// Copy the metadata to the buffer.
@@ -500,7 +503,11 @@ Status PlasmaClient::Impl::GetBuffers(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_CUDA
- physical_buf = gpu_object_map.find(object_ids[i])->second->ptr;
+ std::lock_guard<std::mutex> lock(gpu_mutex);
+ auto iter = gpu_object_map.find(object_ids[i]);
+ ARROW_CHECK(iter != gpu_object_map.end());
+ iter->second->client_count++;
+ physical_buf = iter->second->ptr;
#else
ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
@@ -568,11 +575,12 @@ Status PlasmaClient::Impl::GetBuffers(
std::shared_ptr<CudaContext> context;
RETURN_NOT_OK(manager_->GetContext(object->device_num - 1, &context));
GpuProcessHandle* obj_handle = new GpuProcessHandle();
+ obj_handle->client_count = 1;
RETURN_NOT_OK(context->OpenIpcBuffer(*object->ipc_handle, &obj_handle->ptr));
gpu_object_map[object_ids[i]] = obj_handle;
physical_buf = obj_handle->ptr;
} else {
- handle->second->client_count += 1;
+ handle->second->client_count++;
physical_buf = handle->second->ptr;
}
#else
@@ -633,6 +641,19 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
}
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
+
+#ifdef PLASMA_CUDA
+ if (object_entry->second->object.device_num != 0) {
+ std::lock_guard<std::mutex> lock(gpu_mutex);
+ auto iter = gpu_object_map.find(object_id);
+ ARROW_CHECK(iter != gpu_object_map.end());
+ if (--iter->second->client_count == 0) {
+ delete iter->second;
+ gpu_object_map.erase(iter);
+ }
+ }
+#endif
+
object_entry->second->count -= 1;
ARROW_CHECK(object_entry->second->count >= 0);
// Check if the client is no longer using this object.
@@ -786,6 +807,17 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
return Status::Invalid("Plasma client cannot have a reference to the buffer.");
}
+#ifdef PLASMA_CUDA
+ if (object_entry->second->object.device_num != 0) {
+ std::lock_guard<std::mutex> lock(gpu_mutex);
+ auto iter = gpu_object_map.find(object_id);
+ ARROW_CHECK(iter != gpu_object_map.end());
+ ARROW_CHECK(iter->second->client_count == 1);
+ delete iter->second;
+ gpu_object_map.erase(iter);
+ }
+#endif
+
// Send the abort request.
RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
// Decrease the reference count to zero, then remove the object.
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index dbe3b9f..435b687 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -591,6 +591,7 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
client_.Create(object_id2, data_size, metadata, metadata_size, &data, 1));
ARROW_CHECK_OK(client_.Seal(object_id2));
// Release the ref count of Create function.
+ data = nullptr;
ARROW_CHECK_OK(client_.Release(object_id1));
ARROW_CHECK_OK(client_.Release(object_id2));
// Increase the ref count by calling Get using client2_.
@@ -617,6 +618,42 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
ASSERT_FALSE(has_object);
}
+TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) {
+ const int64_t loop_times = 100;
+ const int64_t object_num = 5;
+ const int64_t data_size = 40;
+
+ std::vector<ObjectID> object_ids;
+
+ // create new gpu objects
+ for (int64_t i = 0; i < object_num; i++) {
+ object_ids.push_back(random_object_id());
+ ObjectID& object_id = object_ids[i];
+
+ std::shared_ptr<Buffer> data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, 1));
+ ARROW_CHECK_OK(client_.Seal(object_id));
+ ARROW_CHECK_OK(client_.Release(object_id));
+ }
+
+ // delete and create again
+ for (int64_t i = 0; i < loop_times; i++) {
+ ObjectID& object_id = object_ids[i % object_num];
+
+ ARROW_CHECK_OK(client_.Delete(object_id));
+
+ std::shared_ptr<Buffer> data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, 1));
+ ARROW_CHECK_OK(client_.Seal(object_id));
+
+ data = nullptr;
+ ARROW_CHECK_OK(client_.Release(object_id));
+ }
+
+ // delete all
+ ARROW_CHECK_OK(client_.Delete(object_ids));
+}
+
TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
ObjectID object_id = random_object_id();
std::vector<ObjectBuffer> object_buffers;