You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/04/04 17:52:18 UTC

[arrow] branch master updated: ARROW-2195: [Plasma] Return auto-releasing buffers

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new cf39686  ARROW-2195: [Plasma] Return auto-releasing buffers
cf39686 is described below

commit cf396867df6f1f93948c69ce10ceb0f95e399242
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Apr 4 10:51:51 2018 -0700

    ARROW-2195: [Plasma] Return auto-releasing buffers
    
    On the C++ side, add a new PlasmaClient::GetAuto() method to return buffers that release
    the corresponding object on destruction.
    
    On the Python side, return such buffers in PlasmaClient.get_buffers().
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #1807 from pitrou/ARROW-2195-plasma-buffers and squashes the following commits:
    
    e3747cf7 <Antoine Pitrou> Remove XXX comments
    ac2d84c3 <Antoine Pitrou> Migrate PlasmaClient::Get()
    927a5352 <Antoine Pitrou> Use GetAuto() in plasma.pyx, not Get()
    562abc5d <Antoine Pitrou> Use ARROW_UNUSED()
    4d33403f <Antoine Pitrou> Move FRIEND_TEST to the Arrow codebase
    123ea839 <Antoine Pitrou> Try to fix odd failure
    43b377f8 <Antoine Pitrou> Add tests for device_num
    dee2f5dd <Antoine Pitrou> Add Python test
    30a439d1 <Antoine Pitrou> Cleanups
    483bbdbf <Antoine Pitrou> Allow getting back CUDA buffer from buffer
    c2af4d36 <Antoine Pitrou> ARROW-2195:  Return auto-releasing buffers
---
 cpp/src/arrow/gpu/cuda-test.cc      |  43 +++++++
 cpp/src/arrow/gpu/cuda_memory.cc    |  38 +++++-
 cpp/src/arrow/gpu/cuda_memory.h     |  11 +-
 cpp/src/arrow/test-util.h           |   8 ++
 cpp/src/arrow/util/macros.h         |  25 ++++
 cpp/src/plasma/client.cc            | 146 ++++++++++++++---------
 cpp/src/plasma/client.h             |  58 ++++++---
 cpp/src/plasma/test/client_tests.cc | 226 +++++++++++++++++++++++++-----------
 python/pyarrow/plasma.pyx           |  40 +++----
 python/pyarrow/tests/test_plasma.py |  37 +++++-
 10 files changed, 456 insertions(+), 176 deletions(-)

diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index ae411c9..04e1f92 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -80,6 +80,49 @@ TEST_F(TestCudaBuffer, CopyFromHost) {
   AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
 }
 
+TEST_F(TestCudaBuffer, FromBuffer) {
+  const int64_t kSize = 1000;
+  // Initialize device buffer with random data
+  std::shared_ptr<PoolBuffer> host_buffer;
+  std::shared_ptr<CudaBuffer> device_buffer;
+  ASSERT_OK(context_->Allocate(kSize, &device_buffer));
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
+  ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 1000));
+  // Sanity check
+  AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
+
+  // Get generic Buffer from device buffer
+  std::shared_ptr<Buffer> buffer;
+  std::shared_ptr<CudaBuffer> result;
+  buffer = std::static_pointer_cast<Buffer>(device_buffer);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize);
+  ASSERT_EQ(result->is_mutable(), true);
+  ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+  AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+  buffer = SliceBuffer(device_buffer, 0, kSize);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize);
+  ASSERT_EQ(result->is_mutable(), false);
+  AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+  buffer = SliceMutableBuffer(device_buffer, 0, kSize);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize);
+  ASSERT_EQ(result->is_mutable(), true);
+  ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+  AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+  buffer = SliceMutableBuffer(device_buffer, 3, kSize - 10);
+  buffer = SliceMutableBuffer(buffer, 8, kSize - 20);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize - 20);
+  ASSERT_EQ(result->is_mutable(), true);
+  ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+  AssertCudaBufferEquals(*result, host_buffer->data() + 11, kSize - 20);
+}
+
 // IPC only supported on Linux
 #if defined(__linux)
 
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 183cbcb..a245509 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -98,7 +98,34 @@ CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t
     : Buffer(parent, offset, size),
       context_(parent->context()),
       own_data_(false),
-      is_ipc_(false) {}
+      is_ipc_(false) {
+  if (parent->is_mutable()) {
+    is_mutable_ = true;
+    mutable_data_ = const_cast<uint8_t*>(data_);
+  }
+}
+
+Status CudaBuffer::FromBuffer(std::shared_ptr<Buffer> buffer,
+                              std::shared_ptr<CudaBuffer>* out) {
+  int64_t offset = 0, size = buffer->size();
+  bool is_mutable = buffer->is_mutable();
+  // The original CudaBuffer may have been wrapped in another Buffer
+  // (for example through slicing).
+  while (!(*out = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
+    const std::shared_ptr<Buffer> parent = buffer->parent();
+    if (!parent) {
+      return Status::TypeError("buffer is not backed by a CudaBuffer");
+    }
+    offset += buffer->data() - parent->data();
+    buffer = parent;
+  }
+  // Re-slice to represent the same memory area
+  if (offset != 0 || (*out)->size() != size || !is_mutable) {
+    *out = std::make_shared<CudaBuffer>(*out, offset, size);
+    (*out)->is_mutable_ = is_mutable;
+  }
+  return Status::OK();
+}
 
 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
                               void* out) const {
@@ -129,8 +156,13 @@ CudaHostBuffer::~CudaHostBuffer() {
 // ----------------------------------------------------------------------
 // CudaBufferReader
 
-CudaBufferReader::CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer)
-    : io::BufferReader(buffer), cuda_buffer_(buffer), context_(buffer->context()) {}
+CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
+    : io::BufferReader(buffer) {
+  if (!CudaBuffer::FromBuffer(buffer, &cuda_buffer_).ok()) {
+    throw std::bad_cast();
+  }
+  context_ = cuda_buffer_->context();
+}
 
 CudaBufferReader::~CudaBufferReader() {}
 
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 3f3dd2f..7eb8b88 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -46,6 +46,15 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
 
   ~CudaBuffer();
 
+  /// \brief Convert back generic buffer into CudaBuffer
+  /// \param[in] buffer buffer to convert
+  /// \param[out] out conversion result
+  /// \return Status
+  ///
+  /// This function returns an error if the buffer isn't backed by GPU memory
+  static Status FromBuffer(std::shared_ptr<Buffer> buffer,
+                           std::shared_ptr<CudaBuffer>* out);
+
   /// \brief Copy memory from GPU device to CPU host
   /// \param[out] out a pre-allocated output buffer
   /// \return Status
@@ -123,7 +132,7 @@ class ARROW_EXPORT CudaIpcMemHandle {
 /// able to do anything other than pointer arithmetic on the returned buffers
 class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
  public:
-  explicit CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer);
+  explicit CudaBufferReader(const std::shared_ptr<Buffer>& buffer);
   ~CudaBufferReader();
 
   /// \brief Read bytes into pre-allocated host memory
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index fdd42a6..58f82b3 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -324,6 +324,14 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual
   }
 }
 
+void AssertBufferEqual(const Buffer& buffer, const std::vector<uint8_t>& expected) {
+  ASSERT_EQ(buffer.size(), expected.size());
+  const uint8_t* buffer_data = buffer.data();
+  for (size_t i = 0; i < expected.size(); ++i) {
+    ASSERT_EQ(buffer_data[i], expected[i]);
+  }
+}
+
 void PrintColumn(const Column& col, std::stringstream* ss) {
   const ChunkedArray& carr = *col.data();
   for (int i = 0; i < carr.num_chunks(); ++i) {
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index 8b1125d..d900256 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -94,4 +94,29 @@
 #endif
 #endif  // !defined(MANUALLY_ALIGNED_STRUCT)
 
+// ----------------------------------------------------------------------
+// From googletest
+// (also in parquet-cpp)
+
+// When you need to test the private or protected members of a class,
+// use the FRIEND_TEST macro to declare your tests as friends of the
+// class.  For example:
+//
+// class MyClass {
+//  private:
+//   void MyMethod();
+//   FRIEND_TEST(MyClassTest, MyMethod);
+// };
+//
+// class MyClassTest : public testing::Test {
+//   // ...
+// };
+//
+// TEST_F(MyClassTest, MyMethod) {
+//   // Can call MyClass::MyMethod() here.
+// }
+
+#define FRIEND_TEST(test_case_name, test_name) \
+  friend class test_case_name##_##test_name##_Test
+
 #endif  // ARROW_UTIL_MACROS_H
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index a9bbd8c..9635e70 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -72,6 +72,27 @@ constexpr int64_t kThreadPoolSize = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
 static std::vector<std::thread> threadpool_(kThreadPoolSize);
 
+/// A Buffer class that automatically releases the backing plasma object
+/// when it goes out of scope.
+class PlasmaBuffer : public Buffer {
+ public:
+  ~PlasmaBuffer();
+
+  PlasmaBuffer(PlasmaClient* client, const ObjectID& object_id,
+               const std::shared_ptr<Buffer>& buffer)
+      : Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) {
+    if (buffer->is_mutable()) {
+      is_mutable_ = true;
+    }
+  }
+
+ private:
+  PlasmaClient* client_;
+  ObjectID object_id_;
+};
+
+PlasmaBuffer::~PlasmaBuffer() { ARROW_UNUSED(client_->Release(object_id_)); }
+
 struct ObjectInUseEntry {
   /// A count of the number of times this client has called PlasmaClient::Create
   /// or
@@ -144,6 +165,11 @@ uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) {
   return entry->second.pointer;
 }
 
+bool PlasmaClient::IsInUse(const ObjectID& object_id) {
+  const auto elem = objects_in_use_.find(object_id);
+  return (elem != objects_in_use_.end());
+}
+
 void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObject* object,
                                           bool is_sealed) {
   // Increment the count of the object to track the fact that it is being used.
@@ -182,7 +208,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec
 }
 
 Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
-                            uint8_t* metadata, int64_t metadata_size,
+                            const uint8_t* metadata, int64_t metadata_size,
                             std::shared_ptr<Buffer>* data, int device_num) {
   ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
                    << data_size << " and metadata size " << metadata_size;
@@ -247,49 +273,45 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
   return Status::OK();
 }
 
-Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
-                         int64_t timeout_ms, ObjectBuffer* object_buffers) {
+Status PlasmaClient::GetBuffers(
+    const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+    const std::function<std::shared_ptr<Buffer>(
+        const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
+    ObjectBuffer* object_buffers) {
   // Fill out the info for the objects that are already in use locally.
   bool all_present = true;
-  for (int i = 0; i < num_objects; ++i) {
+  for (int64_t i = 0; i < num_objects; ++i) {
     auto object_entry = objects_in_use_.find(object_ids[i]);
     if (object_entry == objects_in_use_.end()) {
       // This object is not currently in use by this client, so we need to send
       // a request to the store.
       all_present = false;
-      // Make a note to ourselves that the object is not present.
-      object_buffers[i].data_size = -1;
     } else {
       // NOTE: If the object is still unsealed, we will deadlock, since we must
       // have been the one who created it.
       ARROW_CHECK(object_entry->second->is_sealed)
           << "Plasma client called get on an unsealed object that it created";
       PlasmaObject* object = &object_entry->second->object;
+      std::shared_ptr<Buffer> physical_buf;
+
       if (object->device_num == 0) {
         uint8_t* data = lookup_mmapped_file(object->store_fd);
-        object_buffers[i].data =
-            std::make_shared<Buffer>(data + object->data_offset, object->data_size);
-        object_buffers[i].metadata = std::make_shared<Buffer>(
-            data + object->data_offset + object->data_size, object->metadata_size);
+        physical_buf = std::make_shared<Buffer>(
+            data + object->data_offset, object->data_size + object->metadata_size);
       } else {
 #ifdef PLASMA_GPU
-        std::shared_ptr<CudaBuffer> gpu_handle =
-            gpu_object_map.find(object_ids[i])->second->ptr;
-        object_buffers[i].data =
-            std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
-        object_buffers[i].metadata = std::make_shared<CudaBuffer>(
-            gpu_handle, object->data_size, object->metadata_size);
+        physical_buf = gpu_object_map.find(object_ids[i])->second->ptr;
 #else
         ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
 #endif
       }
-      object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata_size = object->metadata_size;
+      physical_buf = wrap_buffer(object_ids[i], physical_buf);
+      object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
+      object_buffers[i].metadata =
+          SliceBuffer(physical_buf, object->data_size, object->metadata_size);
       object_buffers[i].device_num = object->device_num;
       // Increment the count of the number of instances of this object that this
-      // client is using. A call to PlasmaClient::Release is required to
-      // decrement this
-      // count. Cache the reference to the object.
+      // client is using. Cache the reference to the object.
       increment_object_count(object_ids[i], object, true);
     }
   }
@@ -300,7 +322,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
 
   // If we get here, then the objects aren't all currently in use by this
   // client, so we need to send a request to the plasma store.
-  RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms));
+  RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms));
   std::vector<uint8_t> buffer;
   RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, &buffer));
   std::vector<ObjectID> received_object_ids(num_objects);
@@ -320,10 +342,10 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
     lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
   }
 
-  for (int i = 0; i < num_objects; ++i) {
+  for (int64_t i = 0; i < num_objects; ++i) {
     DCHECK(received_object_ids[i] == object_ids[i]);
     object = &object_data[i];
-    if (object_buffers[i].data_size != -1) {
+    if (object_buffers[i].data) {
       // If the object was already in use by the client, then the store should
       // have returned it.
       DCHECK_NE(object->data_size, -1);
@@ -334,56 +356,67 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
     // If we are here, the object was not currently in use, so we need to
     // process the reply from the object store.
     if (object->data_size != -1) {
+      std::shared_ptr<Buffer> physical_buf;
       if (object->device_num == 0) {
         uint8_t* data = lookup_mmapped_file(object->store_fd);
-        // Finish filling out the return values.
-        object_buffers[i].data =
-            std::make_shared<Buffer>(data + object->data_offset, object->data_size);
-        object_buffers[i].metadata = std::make_shared<Buffer>(
-            data + object->data_offset + object->data_size, object->metadata_size);
+        physical_buf = std::make_shared<Buffer>(
+            data + object->data_offset, object->data_size + object->metadata_size);
       } else {
 #ifdef PLASMA_GPU
         std::lock_guard<std::mutex> lock(gpu_mutex);
         auto handle = gpu_object_map.find(object_ids[i]);
-        std::shared_ptr<CudaBuffer> gpu_handle;
         if (handle == gpu_object_map.end()) {
           std::shared_ptr<CudaContext> context;
           RETURN_NOT_OK(manager_->GetContext(object->device_num - 1, &context));
           GpuProcessHandle* obj_handle = new GpuProcessHandle();
           RETURN_NOT_OK(context->OpenIpcBuffer(*object->ipc_handle, &obj_handle->ptr));
           gpu_object_map[object_ids[i]] = obj_handle;
-          gpu_handle = obj_handle->ptr;
+          physical_buf = obj_handle->ptr;
         } else {
           handle->second->client_count += 1;
-          gpu_handle = handle->second->ptr;
+          physical_buf = handle->second->ptr;
         }
-        object_buffers[i].data =
-            std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
-        object_buffers[i].metadata = std::make_shared<CudaBuffer>(
-            gpu_handle, object->data_size, object->metadata_size);
 #else
         ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
 #endif
       }
-      object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata_size = object->metadata_size;
+      // Finish filling out the return values.
+      physical_buf = wrap_buffer(object_ids[i], physical_buf);
+      object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
+      object_buffers[i].metadata =
+          SliceBuffer(physical_buf, object->data_size, object->metadata_size);
       object_buffers[i].device_num = object->device_num;
       // Increment the count of the number of instances of this object that this
-      // client is using. A call to PlasmaClient::Release is required to
-      // decrement this
-      // count. Cache the reference to the object.
+      // client is using. Cache the reference to the object.
       increment_object_count(received_object_ids[i], object, true);
     } else {
-      // The object was not retrieved. Make sure we already put a -1 here to
-      // indicate that the object was not retrieved. The caller is not
-      // responsible for releasing this object.
-      DCHECK_EQ(object_buffers[i].data_size, -1);
-      object_buffers[i].data_size = -1;
+      // The object was not retrieved.  The caller can detect this condition
+      // by checking the boolean value of the metadata/data buffers.
+      DCHECK(!object_buffers[i].metadata);
+      DCHECK(!object_buffers[i].data);
     }
   }
   return Status::OK();
 }
 
+Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
+                         std::vector<ObjectBuffer>* out) {
+  const auto wrap_buffer = [=](const ObjectID& object_id,
+                               const std::shared_ptr<Buffer>& buffer) {
+    return std::make_shared<PlasmaBuffer>(this, object_id, buffer);
+  };
+  const size_t num_objects = object_ids.size();
+  *out = std::vector<ObjectBuffer>(num_objects);
+  return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0]);
+}
+
+Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
+                         int64_t timeout_ms, ObjectBuffer* out) {
+  const auto wrap_buffer = [](const ObjectID& object_id,
+                              const std::shared_ptr<Buffer>& buffer) { return buffer; };
+  return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
+}
+
 Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
   auto object_entry = objects_in_use_.find(object_id);
   ARROW_CHECK(object_entry != objects_in_use_.end());
@@ -546,24 +579,26 @@ static inline bool compute_object_hash_parallel(XXH64_state_t* hash_state,
 }
 
 static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
+  DCHECK(obj_buffer.metadata);
+  DCHECK(obj_buffer.data);
   XXH64_state_t hash_state;
   if (obj_buffer.device_num != 0) {
     // TODO(wap): Create cuda program to hash data on gpu.
     return 0;
   }
   XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
-  if (obj_buffer.data_size >= kBytesInMB) {
+  if (obj_buffer.data->size() >= kBytesInMB) {
     compute_object_hash_parallel(
         &hash_state, reinterpret_cast<const unsigned char*>(obj_buffer.data->data()),
-        obj_buffer.data_size);
+        obj_buffer.data->size());
   } else {
     XXH64_update(&hash_state,
                  reinterpret_cast<const unsigned char*>(obj_buffer.data->data()),
-                 obj_buffer.data_size);
+                 obj_buffer.data->size());
   }
   XXH64_update(&hash_state,
                reinterpret_cast<const unsigned char*>(obj_buffer.metadata->data()),
-               obj_buffer.metadata_size);
+               obj_buffer.metadata->size());
   return XXH64_digest(&hash_state);
 }
 
@@ -647,17 +682,16 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
 Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
   // Get the plasma object data. We pass in a timeout of 0 to indicate that
   // the operation should timeout immediately.
-  ObjectBuffer object_buffer;
-  RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
+  std::vector<ObjectBuffer> object_buffers;
+  RETURN_NOT_OK(Get({object_id}, 0, &object_buffers));
   // If the object was not retrieved, return false.
-  if (object_buffer.data_size == -1) {
+  if (!object_buffers[0].data) {
     return Status::PlasmaObjectNonexistent("Object not found");
   }
   // Compute the hash.
-  uint64_t hash = compute_object_hash(object_buffer);
+  uint64_t hash = compute_object_hash(object_buffers[0]);
   memcpy(digest, &hash, sizeof(hash));
-  // Release the plasma object.
-  return Release(object_id);
+  return Status::OK();
 }
 
 Status PlasmaClient::Subscribe(int* fd) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 7c27c47..dd8175d 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -25,9 +25,11 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <vector>
 
 #include "arrow/buffer.h"
 #include "arrow/status.h"
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 #include "plasma/common.h"
 #ifdef PLASMA_GPU
@@ -48,12 +50,8 @@ constexpr int64_t kL3CacheSizeBytes = 100000000;
 struct ObjectBuffer {
   /// The data buffer.
   std::shared_ptr<Buffer> data;
-  /// The size in bytes of the data object.
-  int64_t data_size;
   /// The metadata buffer.
   std::shared_ptr<Buffer> metadata;
-  /// The metadata size in bytes.
-  int64_t metadata_size;
   /// The device number.
   int device_num;
 };
@@ -121,31 +119,47 @@ class ARROW_EXPORT PlasmaClient {
   ///        device_num = 1 corresponds to GPU0,
   ///        device_num = 2 corresponds to GPU1, etc.
   /// \return The return status.
-  Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
+  ///
+  /// The returned object must be released once it is done with.  It must also
+  /// be either sealed or aborted.
+  Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
                 int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
+
   /// Get some objects from the Plasma Store. This function will block until the
   /// objects have all been created and sealed in the Plasma Store or the
-  /// timeout
-  /// expires. The caller is responsible for releasing any retrieved objects,
-  /// but
-  /// the caller should not release objects that were not retrieved.
+  /// timeout expires.
+  ///
+  /// \param object_ids The IDs of the objects to get.
+  /// \param timeout_ms The amount of time in milliseconds to wait before this
+  ///        request times out. If this value is -1, then no timeout is set.
+  /// \param[out] object_buffers The object results.
+  /// \return The return status.
+  ///
+  /// If an object was not retrieved, the corresponding metadata and data
+  /// fields in the ObjectBuffer structure will evaluate to false.
+  /// Objects are automatically released by the client when their buffers
+  /// get out of scope.
+  Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
+             std::vector<ObjectBuffer>* object_buffers);
+
+  /// Deprecated variant of Get() that doesn't automatically release buffers
+  /// when they get out of scope.
   ///
   /// \param object_ids The IDs of the objects to get.
   /// \param num_objects The number of object IDs to get.
   /// \param timeout_ms The amount of time in milliseconds to wait before this
   ///        request times out. If this value is -1, then no timeout is set.
-  /// \param object_buffers An array where the results will be stored. If the
-  /// data
-  ///        size field is -1, then the object was not retrieved.
+  /// \param object_buffers An array where the results will be stored.
   /// \return The return status.
+  ///
+  /// The caller is responsible for releasing any retrieved objects, but it
+  /// should not release objects that were not retrieved.
   Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
              ObjectBuffer* object_buffers);
 
   /// Tell Plasma that the client no longer needs the object. This should be
-  /// called
-  /// after Get when the client is done with the object. After this call,
-  /// the address returned by Get is no longer valid. This should be called
-  /// once for each call to Get (with the same object ID).
+  /// called after Get() or Create() when the client is done with the object.
+  /// After this call, the buffer returned by Get() is no longer valid.
   ///
   /// \param object_id The ID of the object that is no longer needed.
   /// \return The return status.
@@ -328,6 +342,10 @@ class ARROW_EXPORT PlasmaClient {
   int get_manager_fd() const;
 
  private:
+  FRIEND_TEST(TestPlasmaStore, GetTest);
+  FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
+  FRIEND_TEST(TestPlasmaStore, AbortTest);
+
   /// This is a helper method for unmapping objects for which all references have
   /// gone out of scope, either by calling Release or Abort.
   ///
@@ -340,6 +358,14 @@ class ARROW_EXPORT PlasmaClient {
 
   Status PerformRelease(const ObjectID& object_id);
 
+  /// Common helper for Get() variants
+  Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+                    const std::function<std::shared_ptr<Buffer>(
+                        const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
+                    ObjectBuffer* object_buffers);
+
+  bool IsInUse(const ObjectID& object_id);
+
   uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
 
   uint8_t* lookup_mmapped_file(int store_fd_val);
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 07e0f9c..10e4e4f 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -24,6 +24,8 @@
 
 #include <random>
 
+#include "arrow/test-util.h"
+
 #include "plasma/client.h"
 #include "plasma/common.h"
 #include "plasma/plasma.h"
@@ -35,6 +37,13 @@ namespace plasma {
 
 std::string test_executable;  // NOLINT
 
+void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
+                             const std::vector<uint8_t>& metadata,
+                             const std::vector<uint8_t>& data) {
+  arrow::test::AssertBufferEqual(*object_buffer.metadata, metadata);
+  arrow::test::AssertBufferEqual(*object_buffer.data, data);
+}
+
 class TestPlasmaStore : public ::testing::Test {
  public:
   // TODO(pcm): At the moment, stdout of the test gets mixed up with
@@ -55,10 +64,25 @@ class TestPlasmaStore : public ::testing::Test {
     ARROW_CHECK_OK(
         client2_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
   }
-  virtual void Finish() {
+  virtual void TearDown() {
     ARROW_CHECK_OK(client_.Disconnect());
     ARROW_CHECK_OK(client2_.Disconnect());
-    system("killall plasma_store &");
+    // Kill all plasma_store processes
+    // TODO should only kill the processes we launched
+    system("killall -9 plasma_store");
+  }
+
+  void CreateObject(PlasmaClient& client, const ObjectID& object_id,
+                    const std::vector<uint8_t>& metadata,
+                    const std::vector<uint8_t>& data) {
+    std::shared_ptr<Buffer> data_buffer;
+    ARROW_CHECK_OK(client.Create(object_id, data.size(), &metadata[0], metadata.size(),
+                                 &data_buffer));
+    for (size_t i = 0; i < data.size(); i++) {
+      data_buffer->mutable_data()[i] = data[i];
+    }
+    ARROW_CHECK_OK(client.Seal(object_id));
+    ARROW_CHECK_OK(client.Release(object_id));
   }
 
  protected:
@@ -101,54 +125,87 @@ TEST_F(TestPlasmaStore, ContainsTest) {
 
   // Test for the object being in local Plasma store.
   // First create object.
-  int64_t data_size = 100;
-  uint8_t metadata[] = {5};
-  int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
-  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
-  ARROW_CHECK_OK(client_.Seal(object_id));
+  std::vector<uint8_t> data(100, 0);
+  CreateObject(client_, object_id, {42}, data);
   // Avoid race condition of Plasma Manager waiting for notification.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  std::vector<ObjectBuffer> object_buffers;
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 }
 
 TEST_F(TestPlasmaStore, GetTest) {
+  std::vector<ObjectBuffer> object_buffers;
+
   ObjectID object_id = ObjectID::from_random();
-  ObjectBuffer object_buffer;
 
   // Test for object non-existence.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_FALSE(object_buffers[0].metadata);
+  ASSERT_FALSE(object_buffers[0].data);
+  EXPECT_FALSE(client_.IsInUse(object_id));
 
   // Test for the object being in local Plasma store.
   // First create object.
-  int64_t data_size = 4;
-  uint8_t metadata[] = {5};
-  int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data_buffer;
-  uint8_t* data;
-  ARROW_CHECK_OK(
-      client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer));
-  data = data_buffer->mutable_data();
-  for (int64_t i = 0; i < data_size; i++) {
-    data[i] = static_cast<uint8_t>(i % 4);
+  std::vector<uint8_t> data = {3, 5, 6, 7, 9};
+  CreateObject(client_, object_id, {42}, data);
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
+
+  object_buffers.clear();
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 0);
+  AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9});
+
+  // Metadata keeps object in use
+  {
+    auto metadata = object_buffers[0].metadata;
+    object_buffers.clear();
+    ::arrow::test::AssertBufferEqual(*metadata, {42});
+    ARROW_CHECK_OK(client_.FlushReleaseHistory());
+    EXPECT_TRUE(client_.IsInUse(object_id));
   }
-  ARROW_CHECK_OK(client_.Seal(object_id));
+  // Object is automatically released
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
+}
 
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
-  const uint8_t* object_data = object_buffer.data->data();
-  for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], object_data[i]);
+TEST_F(TestPlasmaStore, LegacyGetTest) {
+  // Test for old non-releasing Get() variant
+  ObjectID object_id = ObjectID::from_random();
+  {
+    ObjectBuffer object_buffer;
+
+    // Test for object non-existence.
+    ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+    ASSERT_FALSE(object_buffer.metadata);
+    ASSERT_FALSE(object_buffer.data);
+    EXPECT_FALSE(client_.IsInUse(object_id));
+
+    // First create object.
+    std::vector<uint8_t> data = {3, 5, 6, 7, 9};
+    CreateObject(client_, object_id, {42}, data);
+    ARROW_CHECK_OK(client_.FlushReleaseHistory());
+    EXPECT_FALSE(client_.IsInUse(object_id));
+
+    ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+    AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9});
   }
+  // Object needs releasing manually
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_TRUE(client_.IsInUse(object_id));
+  ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
 }
 
 TEST_F(TestPlasmaStore, MultipleGetTest) {
   ObjectID object_id1 = ObjectID::from_random();
   ObjectID object_id2 = ObjectID::from_random();
-  ObjectID object_ids[2] = {object_id1, object_id2};
-  ObjectBuffer object_buffer[2];
+  std::vector<ObjectID> object_ids = {object_id1, object_id2};
+  std::vector<ObjectBuffer> object_buffers;
 
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
@@ -162,18 +219,18 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   data->mutable_data()[0] = 2;
   ARROW_CHECK_OK(client_.Seal(object_id2));
 
-  ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
-  ASSERT_EQ(object_buffer[0].data->data()[0], 1);
-  ASSERT_EQ(object_buffer[1].data->data()[0], 2);
+  ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
+  ASSERT_EQ(object_buffers[0].data->data()[0], 1);
+  ASSERT_EQ(object_buffers[1].data->data()[0], 2);
 }
 
 TEST_F(TestPlasmaStore, AbortTest) {
   ObjectID object_id = ObjectID::from_random();
-  ObjectBuffer object_buffer;
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_FALSE(object_buffers[0].data);
 
   // Test object abort.
   // First create object.
@@ -193,30 +250,29 @@ TEST_F(TestPlasmaStore, AbortTest) {
   ASSERT_TRUE(status.IsInvalid());
   // Release, then abort.
   ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_TRUE(client_.IsInUse(object_id));
+
   ARROW_CHECK_OK(client_.Abort(object_id));
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
 
   // Test for object non-existence after the abort.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_FALSE(object_buffers[0].data);
 
   // Create the object successfully this time.
-  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
-  data_ptr = data->mutable_data();
-  for (int64_t i = 0; i < data_size; i++) {
-    data_ptr[i] = static_cast<uint8_t>(i % 4);
-  }
-  ARROW_CHECK_OK(client_.Seal(object_id));
+  CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5});
 
   // Test that we can get the object.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
-  const uint8_t* buffer_ptr = object_buffer.data->data();
-  for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data_ptr[i], buffer_ptr[i]);
-  }
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
+  ARROW_CHECK_OK(client_.Release(object_id));
 }
 
 TEST_F(TestPlasmaStore, MultipleClientTest) {
   ObjectID object_id = ObjectID::from_random();
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence on the first client.
   bool has_object;
@@ -232,8 +288,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  ASSERT_TRUE(object_buffers[0].data);
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 
@@ -245,7 +301,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   ARROW_CHECK_OK(client_.Disconnect());
   // Test that the second client can seal and get the created object.
   ARROW_CHECK_OK(client2_.Seal(object_id));
-  ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
+  ASSERT_TRUE(object_buffers[0].data);
   ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 }
@@ -308,42 +365,66 @@ using arrow::gpu::CudaBuffer;
 using arrow::gpu::CudaBufferReader;
 using arrow::gpu::CudaBufferWriter;
 
+namespace {
+
+void AssertCudaRead(const std::shared_ptr<Buffer>& buffer,
+                    const std::vector<uint8_t>& expected_data) {
+  std::shared_ptr<CudaBuffer> gpu_buffer;
+  const size_t data_size = expected_data.size();
+
+  ARROW_CHECK_OK(CudaBuffer::FromBuffer(buffer, &gpu_buffer));
+  ASSERT_EQ(gpu_buffer->size(), data_size);
+
+  CudaBufferReader reader(gpu_buffer);
+  uint8_t read_data[data_size];
+  int64_t read_data_size;
+  ARROW_CHECK_OK(reader.Read(data_size, &read_data_size, read_data));
+  ASSERT_EQ(read_data_size, data_size);
+
+  for (size_t i = 0; i < data_size; i++) {
+    ASSERT_EQ(read_data[i], expected_data[i]);
+  }
+}
+
+}  // namespace
+
 TEST_F(TestPlasmaStore, GetGPUTest) {
   ObjectID object_id = ObjectID::from_random();
-  ObjectBuffer object_buffer;
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_FALSE(object_buffers[0].data);
 
   // Test for the object being in local Plasma store.
   // First create object.
   uint8_t data[] = {4, 5, 3, 1};
   int64_t data_size = sizeof(data);
-  uint8_t metadata[] = {5};
+  uint8_t metadata[] = {42};
   int64_t metadata_size = sizeof(metadata);
   std::shared_ptr<Buffer> data_buffer;
   std::shared_ptr<CudaBuffer> gpu_buffer;
   ARROW_CHECK_OK(
       client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer, 1));
-  gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(data_buffer);
+  ARROW_CHECK_OK(CudaBuffer::FromBuffer(data_buffer, &gpu_buffer));
   CudaBufferWriter writer(gpu_buffer);
-  writer.Write(data, data_size);
+  ARROW_CHECK_OK(writer.Write(data, data_size));
   ARROW_CHECK_OK(client_.Seal(object_id));
 
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
-  gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(object_buffer.data);
-  CudaBufferReader reader(gpu_buffer);
-  uint8_t read_data[data_size];
-  int64_t read_data_size;
-  reader.Read(data_size, &read_data_size, read_data);
-  for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], read_data[i]);
-  }
+  object_buffers.clear();
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 1);
+  // Check data
+  AssertCudaRead(object_buffers[0].data, {4, 5, 3, 1});
+  // Check metadata
+  AssertCudaRead(object_buffers[0].metadata, {42});
 }
 
 TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
   ObjectID object_id = ObjectID::from_random();
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence on the first client.
   bool has_object;
@@ -360,8 +441,7 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
       client2_.Create(object_id, data_size, metadata, metadata_size, &data, 1));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 
@@ -374,12 +454,16 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
   ARROW_CHECK_OK(client_.Disconnect());
   // Test that the second client can seal and get the created object.
   ARROW_CHECK_OK(client2_.Seal(object_id));
-  ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+  object_buffers.clear();
   ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
+  ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 1);
+  AssertCudaRead(object_buffers[0].metadata, {5});
 }
 
-#endif
+#endif  // PLASMA_GPU
 
 }  // namespace plasma
 
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 32f6d18..b99e2b0 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -29,7 +29,7 @@ from cpython.pycapsule cimport *
 import collections
 import pyarrow
 
-from pyarrow.lib cimport Buffer, NativeFile, check_status
+from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
 from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
                                         CFixedSizeBufferWriter, CStatus)
 
@@ -83,8 +83,8 @@ cdef extern from "plasma/client.h" nogil:
                        const uint8_t* metadata, int64_t metadata_size,
                        const shared_ptr[CBuffer]* data)
 
-        CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
-                    int64_t timeout_ms, CObjectBuffer* object_buffers)
+        CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
+                    c_vector[CObjectBuffer]* object_buffers)
 
         CStatus Seal(const CUniqueID& object_id)
 
@@ -117,9 +117,7 @@ cdef extern from "plasma/client.h" nogil:
 cdef extern from "plasma/client.h" nogil:
 
     cdef struct CObjectBuffer" plasma::ObjectBuffer":
-        int64_t data_size
         shared_ptr[CBuffer] data
-        int64_t metadata_size
         shared_ptr[CBuffer] metadata
 
 
@@ -239,21 +237,16 @@ cdef class PlasmaClient:
 
     cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
                              c_vector[CObjectBuffer]* result):
-        cdef c_vector[CUniqueID] ids
-        cdef ObjectID object_id
+        cdef:
+            c_vector[CUniqueID] ids
+            ObjectID object_id
+
         for object_id in object_ids:
             ids.push_back(object_id.data)
-        result[0].resize(ids.size())
         with nogil:
-            check_status(self.client.get().Get(ids.data(), ids.size(),
-                         timeout_ms, result[0].data()))
-
-    cdef _make_plasma_buffer(self, ObjectID object_id,
-                             shared_ptr[CBuffer] buffer, int64_t size):
-        result = PlasmaBuffer(object_id, self)
-        result.init(buffer)
-        return result
+            check_status(self.client.get().Get(ids, timeout_ms, result))
 
+    # XXX C++ API should instead expose some kind of CreateAuto()
     cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data,
                                      int64_t size):
         cdef shared_ptr[CBuffer] buffer
@@ -332,11 +325,8 @@ cdef class PlasmaClient:
         self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
         result = []
         for i in range(object_buffers.size()):
-            if object_buffers[i].data_size != -1:
-                result.append(
-                    self._make_plasma_buffer(object_ids[i],
-                                             object_buffers[i].data,
-                                             object_buffers[i].data_size))
+            if object_buffers[i].data.get() != nullptr:
+                result.append(pyarrow_wrap_buffer(object_buffers[i].data))
             else:
                 result.append(None)
         return result
@@ -367,10 +357,10 @@ cdef class PlasmaClient:
         self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
         result = []
         for i in range(object_buffers.size()):
-            result.append(
-                self._make_plasma_buffer(object_ids[i],
-                                         object_buffers[i].metadata,
-                                         object_buffers[i].metadata_size))
+            if object_buffers[i].metadata.get() != nullptr:
+                result.append(pyarrow_wrap_buffer(object_buffers[i].metadata))
+            else:
+                result.append(None)
         return result
 
     def put(self, object value, ObjectID object_id=None, int memcopy_threads=6,
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 1df213d..4358841 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -190,7 +190,6 @@ class TestPlasmaClient(object):
         plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
         # Connect to Plasma.
         self.plasma_client = plasma.connect(plasma_store_name, "", 64)
-        # For the eviction test
         self.plasma_client2 = plasma.connect(plasma_store_name, "", 0)
 
     def teardown_method(self, test_method):
@@ -311,6 +310,36 @@ class TestPlasmaClient(object):
                 else:
                     assert results[i] is None
 
+    def test_buffer_lifetime(self):
+        # ARROW-2195
+        arr = pa.array([1, 12, 23, 3, 34], pa.int32())
+        batch = pa.RecordBatch.from_arrays([arr], ['field1'])
+
+        # Serialize RecordBatch into Plasma store
+        sink = pa.MockOutputStream()
+        writer = pa.RecordBatchStreamWriter(sink, batch.schema)
+        writer.write_batch(batch)
+        writer.close()
+
+        object_id = random_object_id()
+        data_buffer = self.plasma_client.create(object_id, sink.size())
+        stream = pa.FixedSizeBufferWriter(data_buffer)
+        writer = pa.RecordBatchStreamWriter(stream, batch.schema)
+        writer.write_batch(batch)
+        writer.close()
+        self.plasma_client.seal(object_id)
+        del data_buffer
+
+        # Unserialize RecordBatch from Plasma store
+        [data_buffer] = self.plasma_client2.get_buffers([object_id])
+        reader = pa.RecordBatchStreamReader(data_buffer)
+        read_batch = reader.read_next_batch()
+        # Lose reference to returned buffer.  The RecordBatch must still
+        # be backed by valid memory.
+        del data_buffer, reader
+
+        assert read_batch.equals(batch)
+
     def test_put_and_get(self):
         for value in [["hello", "world", 3, 1.0], None, "hello"]:
             object_id = self.plasma_client.put(value)
@@ -770,15 +799,15 @@ class TestPlasmaClient(object):
         # them go out of scope.
         for _ in range(100):
             create_object(
-                self.plasma_client,
+                self.plasma_client2,
                 np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
         # Create large objects that require the full object store size, and
         # verify that they fit.
         for _ in range(2):
-            create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY, 0)
+            create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
         # Verify that an object that is too large does not fit.
         with pytest.raises(pa.lib.PlasmaStoreFull):
-            create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY + 1,
+            create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
                           0)
 
 

-- 
To stop receiving notification emails like this one, please contact
pcmoritz@apache.org.