You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/06/04 13:40:40 UTC

[arrow] branch master updated: ARROW-5285: [C++][Plasma] Implement to release GpuProcessHandle

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 135b5e3  ARROW-5285: [C++][Plasma] Implement to release GpuProcessHandle
135b5e3 is described below

commit 135b5e35380015adbcfdcd939683d7bc9603fda5
Author: shengjun.li <sh...@zilliz.com>
AuthorDate: Tue Jun 4 15:40:30 2019 +0200

    ARROW-5285: [C++][Plasma] Implement to release GpuProcessHandle
    
    Author: shengjun.li <sh...@zilliz.com>
    
    Closes #4277 from shengjun1985/gpu and squashes the following commits:
    
    7207ed769 <shengjun.li> ARROW-5285:  implement to release GpuProcessHandle
---
 cpp/src/plasma/client.cc            | 40 +++++++++++++++++++++++++++++++++----
 cpp/src/plasma/test/client_tests.cc | 37 ++++++++++++++++++++++++++++++++++
 2 files changed, 73 insertions(+), 4 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e88c5ca..9447e5d 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -416,12 +416,15 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
     }
   } else {
 #ifdef PLASMA_CUDA
-    std::lock_guard<std::mutex> lock(gpu_mutex);
     std::shared_ptr<CudaContext> context;
     RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context));
     GpuProcessHandle* handle = new GpuProcessHandle();
+    handle->client_count = 2;
     RETURN_NOT_OK(context->OpenIpcBuffer(*object.ipc_handle, &handle->ptr));
-    gpu_object_map[object_id] = handle;
+    {
+      std::lock_guard<std::mutex> lock(gpu_mutex);
+      gpu_object_map[object_id] = handle;
+    }
     *data = handle->ptr;
     if (metadata != NULL) {
       // Copy the metadata to the buffer.
@@ -500,7 +503,11 @@ Status PlasmaClient::Impl::GetBuffers(
             data + object->data_offset, object->data_size + object->metadata_size);
       } else {
 #ifdef PLASMA_CUDA
-        physical_buf = gpu_object_map.find(object_ids[i])->second->ptr;
+        std::lock_guard<std::mutex> lock(gpu_mutex);
+        auto iter = gpu_object_map.find(object_ids[i]);
+        ARROW_CHECK(iter != gpu_object_map.end());
+        iter->second->client_count++;
+        physical_buf = iter->second->ptr;
 #else
         ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
 #endif
@@ -568,11 +575,12 @@ Status PlasmaClient::Impl::GetBuffers(
           std::shared_ptr<CudaContext> context;
           RETURN_NOT_OK(manager_->GetContext(object->device_num - 1, &context));
           GpuProcessHandle* obj_handle = new GpuProcessHandle();
+          obj_handle->client_count = 1;
           RETURN_NOT_OK(context->OpenIpcBuffer(*object->ipc_handle, &obj_handle->ptr));
           gpu_object_map[object_ids[i]] = obj_handle;
           physical_buf = obj_handle->ptr;
         } else {
-          handle->second->client_count += 1;
+          handle->second->client_count++;
           physical_buf = handle->second->ptr;
         }
 #else
@@ -633,6 +641,19 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
   }
   auto object_entry = objects_in_use_.find(object_id);
   ARROW_CHECK(object_entry != objects_in_use_.end());
+
+#ifdef PLASMA_CUDA
+  if (object_entry->second->object.device_num != 0) {
+    std::lock_guard<std::mutex> lock(gpu_mutex);
+    auto iter = gpu_object_map.find(object_id);
+    ARROW_CHECK(iter != gpu_object_map.end());
+    if (--iter->second->client_count == 0) {
+      delete iter->second;
+      gpu_object_map.erase(iter);
+    }
+  }
+#endif
+
   object_entry->second->count -= 1;
   ARROW_CHECK(object_entry->second->count >= 0);
   // Check if the client is no longer using this object.
@@ -786,6 +807,17 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
     return Status::Invalid("Plasma client cannot have a reference to the buffer.");
   }
 
+#ifdef PLASMA_CUDA
+  if (object_entry->second->object.device_num != 0) {
+    std::lock_guard<std::mutex> lock(gpu_mutex);
+    auto iter = gpu_object_map.find(object_id);
+    ARROW_CHECK(iter != gpu_object_map.end());
+    ARROW_CHECK(iter->second->client_count == 1);
+    delete iter->second;
+    gpu_object_map.erase(iter);
+  }
+#endif
+
   // Send the abort request.
   RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
   // Decrease the reference count to zero, then remove the object.
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index dbe3b9f..435b687 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -591,6 +591,7 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
       client_.Create(object_id2, data_size, metadata, metadata_size, &data, 1));
   ARROW_CHECK_OK(client_.Seal(object_id2));
   // Release the ref count of Create function.
+  data = nullptr;
   ARROW_CHECK_OK(client_.Release(object_id1));
   ARROW_CHECK_OK(client_.Release(object_id2));
   // Increase the ref count by calling Get using client2_.
@@ -617,6 +618,42 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
   ASSERT_FALSE(has_object);
 }
 
+TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) {
+  const int64_t loop_times = 100;
+  const int64_t object_num = 5;
+  const int64_t data_size = 40;
+
+  std::vector<ObjectID> object_ids;
+
+  // create new gpu objects
+  for (int64_t i = 0; i < object_num; i++) {
+    object_ids.push_back(random_object_id());
+    ObjectID& object_id = object_ids[i];
+
+    std::shared_ptr<Buffer> data;
+    ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, 1));
+    ARROW_CHECK_OK(client_.Seal(object_id));
+    ARROW_CHECK_OK(client_.Release(object_id));
+  }
+
+  // delete and create again
+  for (int64_t i = 0; i < loop_times; i++) {
+    ObjectID& object_id = object_ids[i % object_num];
+
+    ARROW_CHECK_OK(client_.Delete(object_id));
+
+    std::shared_ptr<Buffer> data;
+    ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, 1));
+    ARROW_CHECK_OK(client_.Seal(object_id));
+
+    data = nullptr;
+    ARROW_CHECK_OK(client_.Release(object_id));
+  }
+
+  // delete all
+  ARROW_CHECK_OK(client_.Delete(object_ids));
+}
+
 TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
   ObjectID object_id = random_object_id();
   std::vector<ObjectBuffer> object_buffers;