You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2019/04/29 21:58:44 UTC

[arrow] branch master updated: ARROW-5186 [Plasma] Fix crash caused by improper free on CUDA memory

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 15c86e6  ARROW-5186 [Plasma] Fix crash caused by improper free on CUDA memory
15c86e6 is described below

commit 15c86e6cfecef0aa742b446bd7cdcc1ed1e0b180
Author: shengjun.li <sh...@zilliz.com>
AuthorDate: Mon Apr 29 14:58:31 2019 -0700

    ARROW-5186 [Plasma] Fix crash caused by improper free on CUDA memory
    
    Author: shengjun.li <sh...@zilliz.com>
    
    Closes #4177 from shengjun1985/master and squashes the following commits:
    
    f384a9419 <shengjun.li> ARROW-5186  Fix crash caused by improper free on CUDA memory
    48e0fda9e <shengjun.li> ARROW-5186  fix carsh on delete gpu memory
---
 cpp/src/arrow/gpu/cuda_context.h    |  7 +++++-
 cpp/src/plasma/store.cc             | 16 ++++++++++++-
 cpp/src/plasma/store.h              |  2 ++
 cpp/src/plasma/test/client_tests.cc | 46 +++++++++++++++++++++++++++++++++++++
 4 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 682cbd8..99c3fc2 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -133,6 +133,12 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
   /// memory is page-locked (using cudaHostRegister).
   Status GetDeviceAddress(uint8_t* addr, uint8_t** devaddr);
 
+  /// \brief Release CUDA memory on GPU device for this context
+  /// \param[in] device_ptr the buffer address
+  /// \param[in] nbytes number of bytes
+  /// \return Status
+  Status Free(void* device_ptr, int64_t nbytes);
+
  private:
   CudaContext();
 
@@ -143,7 +149,6 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
   Status CopyDeviceToDevice(void* dst, const void* src, int64_t nbytes);
   Status CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>& dst_ctx, void* dst,
                                    const void* src, int64_t nbytes);
-  Status Free(void* device_ptr, int64_t nbytes);
 
   class CudaContextImpl;
   std::unique_ptr<CudaContextImpl> impl_;
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 4d86653..680714f 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -194,6 +194,13 @@ Status PlasmaStore::AllocateCudaMemory(
   // The IPC handle will keep the buffer memory alive
   return cuda_buffer->ExportForIpc(out_ipc_handle);
 }
+
+Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointer) {
+  std::shared_ptr<CudaContext> context_;
+  RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context_));
+  RETURN_NOT_OK(context_->Free(pointer, size));
+  return Status::OK();
+}
 #endif
 
 // Create a new object buffer in the hash table.
@@ -532,7 +539,14 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,
 
 void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) {
   auto& object = store_info_.objects[object_id];
-  PlasmaAllocator::Free(object->pointer, object->data_size + object->metadata_size);
+  auto buff_size = object->data_size + object->metadata_size;
+  if (object->device_num == 0) {
+    PlasmaAllocator::Free(object->pointer, buff_size);
+  } else {
+#ifdef PLASMA_CUDA
+    ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer));
+#endif
+  }
   store_info_.objects.erase(object_id);
 }
 
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 53464ab..26b49f0 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -218,6 +218,8 @@ class PlasmaStore {
 #ifdef PLASMA_CUDA
   Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer,
                             std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle);
+
+  Status FreeCudaMemory(int device_num, int64_t size, uint8_t* out_pointer);
 #endif
 
   /// Event loop of the plasma store.
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 4dd0c06..fd91b32 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -538,6 +538,52 @@ TEST_F(TestPlasmaStore, GetGPUTest) {
   AssertCudaRead(object_buffers[0].metadata, {42});
 }
 
+TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
+  ObjectID object_id1 = random_object_id();
+  ObjectID object_id2 = random_object_id();
+
+  // Test for deleting non-existance object.
+  Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  ARROW_CHECK_OK(result);
+  // Test for the object being in local Plasma store.
+  // First create object.
+  int64_t data_size = 100;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  std::shared_ptr<Buffer> data;
+  ARROW_CHECK_OK(
+      client_.Create(object_id1, data_size, metadata, metadata_size, &data, 1));
+  ARROW_CHECK_OK(client_.Seal(object_id1));
+  ARROW_CHECK_OK(
+      client_.Create(object_id2, data_size, metadata, metadata_size, &data, 1));
+  ARROW_CHECK_OK(client_.Seal(object_id2));
+  // Release the ref count of Create function.
+  ARROW_CHECK_OK(client_.Release(object_id1));
+  ARROW_CHECK_OK(client_.Release(object_id2));
+  // Increase the ref count by calling Get using client2_.
+  std::vector<ObjectBuffer> object_buffers;
+  ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
+  // Objects are still used by client2_.
+  result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  ARROW_CHECK_OK(result);
+  // The object is used and it should not be deleted right now.
+  bool has_object = false;
+  ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
+  ASSERT_TRUE(has_object);
+  ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
+  ASSERT_TRUE(has_object);
+  // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
+  // client2_ won't send the release request immediately because the trigger
+  // condition is not reached. The release is only added to release cache.
+  object_buffers.clear();
+  // Delete the objects.
+  result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
+  ASSERT_FALSE(has_object);
+  ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
+  ASSERT_FALSE(has_object);
+}
+
 TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
   ObjectID object_id = random_object_id();
   std::vector<ObjectBuffer> object_buffers;