You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/04/04 17:52:18 UTC
[arrow] branch master updated: ARROW-2195: [Plasma] Return
auto-releasing buffers
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new cf39686 ARROW-2195: [Plasma] Return auto-releasing buffers
cf39686 is described below
commit cf396867df6f1f93948c69ce10ceb0f95e399242
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Apr 4 10:51:51 2018 -0700
ARROW-2195: [Plasma] Return auto-releasing buffers
On the C++ side, add a new PlasmaClient::GetAuto() method to return buffers that release
the corresponding object on destruction.
On the Python side, return such buffers in PlasmaClient.get_buffers().
Author: Antoine Pitrou <an...@python.org>
Closes #1807 from pitrou/ARROW-2195-plasma-buffers and squashes the following commits:
e3747cf7 <Antoine Pitrou> Remove XXX comments
ac2d84c3 <Antoine Pitrou> Migrate PlasmaClient::Get()
927a5352 <Antoine Pitrou> Use GetAuto() in plasma.pyx, not Get()
562abc5d <Antoine Pitrou> Use ARROW_UNUSED()
4d33403f <Antoine Pitrou> Move FRIEND_TEST to the Arrow codebase
123ea839 <Antoine Pitrou> Try to fix odd failure
43b377f8 <Antoine Pitrou> Add tests for device_num
dee2f5dd <Antoine Pitrou> Add Python test
30a439d1 <Antoine Pitrou> Cleanups
483bbdbf <Antoine Pitrou> Allow getting back CUDA buffer from buffer
c2af4d36 <Antoine Pitrou> ARROW-2195: Return auto-releasing buffers
---
cpp/src/arrow/gpu/cuda-test.cc | 43 +++++++
cpp/src/arrow/gpu/cuda_memory.cc | 38 +++++-
cpp/src/arrow/gpu/cuda_memory.h | 11 +-
cpp/src/arrow/test-util.h | 8 ++
cpp/src/arrow/util/macros.h | 25 ++++
cpp/src/plasma/client.cc | 146 ++++++++++++++---------
cpp/src/plasma/client.h | 58 ++++++---
cpp/src/plasma/test/client_tests.cc | 226 +++++++++++++++++++++++++-----------
python/pyarrow/plasma.pyx | 40 +++----
python/pyarrow/tests/test_plasma.py | 37 +++++-
10 files changed, 456 insertions(+), 176 deletions(-)
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index ae411c9..04e1f92 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -80,6 +80,49 @@ TEST_F(TestCudaBuffer, CopyFromHost) {
AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
}
+TEST_F(TestCudaBuffer, FromBuffer) {
+ const int64_t kSize = 1000;
+ // Initialize device buffer with random data
+ std::shared_ptr<PoolBuffer> host_buffer;
+ std::shared_ptr<CudaBuffer> device_buffer;
+ ASSERT_OK(context_->Allocate(kSize, &device_buffer));
+ ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
+ ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 1000));
+ // Sanity check
+ AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
+
+ // Get generic Buffer from device buffer
+ std::shared_ptr<Buffer> buffer;
+ std::shared_ptr<CudaBuffer> result;
+ buffer = std::static_pointer_cast<Buffer>(device_buffer);
+ ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+ ASSERT_EQ(result->size(), kSize);
+ ASSERT_EQ(result->is_mutable(), true);
+ ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+ AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+ buffer = SliceBuffer(device_buffer, 0, kSize);
+ ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+ ASSERT_EQ(result->size(), kSize);
+ ASSERT_EQ(result->is_mutable(), false);
+ AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+ buffer = SliceMutableBuffer(device_buffer, 0, kSize);
+ ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+ ASSERT_EQ(result->size(), kSize);
+ ASSERT_EQ(result->is_mutable(), true);
+ ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+ AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+ buffer = SliceMutableBuffer(device_buffer, 3, kSize - 10);
+ buffer = SliceMutableBuffer(buffer, 8, kSize - 20);
+ ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+ ASSERT_EQ(result->size(), kSize - 20);
+ ASSERT_EQ(result->is_mutable(), true);
+ ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+ AssertCudaBufferEquals(*result, host_buffer->data() + 11, kSize - 20);
+}
+
// IPC only supported on Linux
#if defined(__linux)
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 183cbcb..a245509 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -98,7 +98,34 @@ CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t
: Buffer(parent, offset, size),
context_(parent->context()),
own_data_(false),
- is_ipc_(false) {}
+ is_ipc_(false) {
+ if (parent->is_mutable()) {
+ is_mutable_ = true;
+ mutable_data_ = const_cast<uint8_t*>(data_);
+ }
+}
+
+Status CudaBuffer::FromBuffer(std::shared_ptr<Buffer> buffer,
+ std::shared_ptr<CudaBuffer>* out) {
+ int64_t offset = 0, size = buffer->size();
+ bool is_mutable = buffer->is_mutable();
+ // The original CudaBuffer may have been wrapped in another Buffer
+ // (for example through slicing).
+ while (!(*out = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
+ const std::shared_ptr<Buffer> parent = buffer->parent();
+ if (!parent) {
+ return Status::TypeError("buffer is not backed by a CudaBuffer");
+ }
+ offset += buffer->data() - parent->data();
+ buffer = parent;
+ }
+ // Re-slice to represent the same memory area
+ if (offset != 0 || (*out)->size() != size || !is_mutable) {
+ *out = std::make_shared<CudaBuffer>(*out, offset, size);
+ (*out)->is_mutable_ = is_mutable;
+ }
+ return Status::OK();
+}
Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
void* out) const {
@@ -129,8 +156,13 @@ CudaHostBuffer::~CudaHostBuffer() {
// ----------------------------------------------------------------------
// CudaBufferReader
-CudaBufferReader::CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer)
- : io::BufferReader(buffer), cuda_buffer_(buffer), context_(buffer->context()) {}
+CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
+ : io::BufferReader(buffer) {
+ if (!CudaBuffer::FromBuffer(buffer, &cuda_buffer_).ok()) {
+ throw std::bad_cast();
+ }
+ context_ = cuda_buffer_->context();
+}
CudaBufferReader::~CudaBufferReader() {}
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 3f3dd2f..7eb8b88 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -46,6 +46,15 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
~CudaBuffer();
+ /// \brief Convert back generic buffer into CudaBuffer
+ /// \param[in] buffer buffer to convert
+ /// \param[out] out conversion result
+ /// \return Status
+ ///
+ /// This function returns an error if the buffer isn't backed by GPU memory
+ static Status FromBuffer(std::shared_ptr<Buffer> buffer,
+ std::shared_ptr<CudaBuffer>* out);
+
/// \brief Copy memory from GPU device to CPU host
/// \param[out] out a pre-allocated output buffer
/// \return Status
@@ -123,7 +132,7 @@ class ARROW_EXPORT CudaIpcMemHandle {
/// able to do anything other than pointer arithmetic on the returned buffers
class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
public:
- explicit CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer);
+ explicit CudaBufferReader(const std::shared_ptr<Buffer>& buffer);
~CudaBufferReader();
/// \brief Read bytes into pre-allocated host memory
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index fdd42a6..58f82b3 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -324,6 +324,14 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual
}
}
+void AssertBufferEqual(const Buffer& buffer, const std::vector<uint8_t>& expected) {
+ ASSERT_EQ(buffer.size(), expected.size());
+ const uint8_t* buffer_data = buffer.data();
+ for (size_t i = 0; i < expected.size(); ++i) {
+ ASSERT_EQ(buffer_data[i], expected[i]);
+ }
+}
+
void PrintColumn(const Column& col, std::stringstream* ss) {
const ChunkedArray& carr = *col.data();
for (int i = 0; i < carr.num_chunks(); ++i) {
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index 8b1125d..d900256 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -94,4 +94,29 @@
#endif
#endif // !defined(MANUALLY_ALIGNED_STRUCT)
+// ----------------------------------------------------------------------
+// From googletest
+// (also in parquet-cpp)
+
+// When you need to test the private or protected members of a class,
+// use the FRIEND_TEST macro to declare your tests as friends of the
+// class. For example:
+//
+// class MyClass {
+// private:
+// void MyMethod();
+// FRIEND_TEST(MyClassTest, MyMethod);
+// };
+//
+// class MyClassTest : public testing::Test {
+// // ...
+// };
+//
+// TEST_F(MyClassTest, MyMethod) {
+// // Can call MyClass::MyMethod() here.
+// }
+
+#define FRIEND_TEST(test_case_name, test_name) \
+ friend class test_case_name##_##test_name##_Test
+
#endif // ARROW_UTIL_MACROS_H
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index a9bbd8c..9635e70 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -72,6 +72,27 @@ constexpr int64_t kThreadPoolSize = 8;
constexpr int64_t kBytesInMB = 1 << 20;
static std::vector<std::thread> threadpool_(kThreadPoolSize);
+/// A Buffer class that automatically releases the backing plasma object
+/// when it goes out of scope.
+class PlasmaBuffer : public Buffer {
+ public:
+ ~PlasmaBuffer();
+
+ PlasmaBuffer(PlasmaClient* client, const ObjectID& object_id,
+ const std::shared_ptr<Buffer>& buffer)
+ : Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) {
+ if (buffer->is_mutable()) {
+ is_mutable_ = true;
+ }
+ }
+
+ private:
+ PlasmaClient* client_;
+ ObjectID object_id_;
+};
+
+PlasmaBuffer::~PlasmaBuffer() { ARROW_UNUSED(client_->Release(object_id_)); }
+
struct ObjectInUseEntry {
/// A count of the number of times this client has called PlasmaClient::Create
/// or
@@ -144,6 +165,11 @@ uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) {
return entry->second.pointer;
}
+bool PlasmaClient::IsInUse(const ObjectID& object_id) {
+ const auto elem = objects_in_use_.find(object_id);
+ return (elem != objects_in_use_.end());
+}
+
void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObject* object,
bool is_sealed) {
// Increment the count of the object to track the fact that it is being used.
@@ -182,7 +208,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec
}
Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
- uint8_t* metadata, int64_t metadata_size,
+ const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
@@ -247,49 +273,45 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
return Status::OK();
}
-Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
- int64_t timeout_ms, ObjectBuffer* object_buffers) {
+Status PlasmaClient::GetBuffers(
+ const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+ const std::function<std::shared_ptr<Buffer>(
+ const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
+ ObjectBuffer* object_buffers) {
// Fill out the info for the objects that are already in use locally.
bool all_present = true;
- for (int i = 0; i < num_objects; ++i) {
+ for (int64_t i = 0; i < num_objects; ++i) {
auto object_entry = objects_in_use_.find(object_ids[i]);
if (object_entry == objects_in_use_.end()) {
// This object is not currently in use by this client, so we need to send
// a request to the store.
all_present = false;
- // Make a note to ourselves that the object is not present.
- object_buffers[i].data_size = -1;
} else {
// NOTE: If the object is still unsealed, we will deadlock, since we must
// have been the one who created it.
ARROW_CHECK(object_entry->second->is_sealed)
<< "Plasma client called get on an unsealed object that it created";
PlasmaObject* object = &object_entry->second->object;
+ std::shared_ptr<Buffer> physical_buf;
+
if (object->device_num == 0) {
uint8_t* data = lookup_mmapped_file(object->store_fd);
- object_buffers[i].data =
- std::make_shared<Buffer>(data + object->data_offset, object->data_size);
- object_buffers[i].metadata = std::make_shared<Buffer>(
- data + object->data_offset + object->data_size, object->metadata_size);
+ physical_buf = std::make_shared<Buffer>(
+ data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_GPU
- std::shared_ptr<CudaBuffer> gpu_handle =
- gpu_object_map.find(object_ids[i])->second->ptr;
- object_buffers[i].data =
- std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
- object_buffers[i].metadata = std::make_shared<CudaBuffer>(
- gpu_handle, object->data_size, object->metadata_size);
+ physical_buf = gpu_object_map.find(object_ids[i])->second->ptr;
#else
ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
}
- object_buffers[i].data_size = object->data_size;
- object_buffers[i].metadata_size = object->metadata_size;
+ physical_buf = wrap_buffer(object_ids[i], physical_buf);
+ object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
+ object_buffers[i].metadata =
+ SliceBuffer(physical_buf, object->data_size, object->metadata_size);
object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that this
- // client is using. A call to PlasmaClient::Release is required to
- // decrement this
- // count. Cache the reference to the object.
+ // client is using. Cache the reference to the object.
increment_object_count(object_ids[i], object, true);
}
}
@@ -300,7 +322,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
// If we get here, then the objects aren't all currently in use by this
// client, so we need to send a request to the plasma store.
- RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms));
+ RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, &buffer));
std::vector<ObjectID> received_object_ids(num_objects);
@@ -320,10 +342,10 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
}
- for (int i = 0; i < num_objects; ++i) {
+ for (int64_t i = 0; i < num_objects; ++i) {
DCHECK(received_object_ids[i] == object_ids[i]);
object = &object_data[i];
- if (object_buffers[i].data_size != -1) {
+ if (object_buffers[i].data) {
// If the object was already in use by the client, then the store should
// have returned it.
DCHECK_NE(object->data_size, -1);
@@ -334,56 +356,67 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
// If we are here, the object was not currently in use, so we need to
// process the reply from the object store.
if (object->data_size != -1) {
+ std::shared_ptr<Buffer> physical_buf;
if (object->device_num == 0) {
uint8_t* data = lookup_mmapped_file(object->store_fd);
- // Finish filling out the return values.
- object_buffers[i].data =
- std::make_shared<Buffer>(data + object->data_offset, object->data_size);
- object_buffers[i].metadata = std::make_shared<Buffer>(
- data + object->data_offset + object->data_size, object->metadata_size);
+ physical_buf = std::make_shared<Buffer>(
+ data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_GPU
std::lock_guard<std::mutex> lock(gpu_mutex);
auto handle = gpu_object_map.find(object_ids[i]);
- std::shared_ptr<CudaBuffer> gpu_handle;
if (handle == gpu_object_map.end()) {
std::shared_ptr<CudaContext> context;
RETURN_NOT_OK(manager_->GetContext(object->device_num - 1, &context));
GpuProcessHandle* obj_handle = new GpuProcessHandle();
RETURN_NOT_OK(context->OpenIpcBuffer(*object->ipc_handle, &obj_handle->ptr));
gpu_object_map[object_ids[i]] = obj_handle;
- gpu_handle = obj_handle->ptr;
+ physical_buf = obj_handle->ptr;
} else {
handle->second->client_count += 1;
- gpu_handle = handle->second->ptr;
+ physical_buf = handle->second->ptr;
}
- object_buffers[i].data =
- std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
- object_buffers[i].metadata = std::make_shared<CudaBuffer>(
- gpu_handle, object->data_size, object->metadata_size);
#else
ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
}
- object_buffers[i].data_size = object->data_size;
- object_buffers[i].metadata_size = object->metadata_size;
+ // Finish filling out the return values.
+ physical_buf = wrap_buffer(object_ids[i], physical_buf);
+ object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
+ object_buffers[i].metadata =
+ SliceBuffer(physical_buf, object->data_size, object->metadata_size);
object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that this
- // client is using. A call to PlasmaClient::Release is required to
- // decrement this
- // count. Cache the reference to the object.
+ // client is using. Cache the reference to the object.
increment_object_count(received_object_ids[i], object, true);
} else {
- // The object was not retrieved. Make sure we already put a -1 here to
- // indicate that the object was not retrieved. The caller is not
- // responsible for releasing this object.
- DCHECK_EQ(object_buffers[i].data_size, -1);
- object_buffers[i].data_size = -1;
+ // The object was not retrieved. The caller can detect this condition
+ // by checking the boolean value of the metadata/data buffers.
+ DCHECK(!object_buffers[i].metadata);
+ DCHECK(!object_buffers[i].data);
}
}
return Status::OK();
}
+Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
+ std::vector<ObjectBuffer>* out) {
+ const auto wrap_buffer = [=](const ObjectID& object_id,
+ const std::shared_ptr<Buffer>& buffer) {
+ return std::make_shared<PlasmaBuffer>(this, object_id, buffer);
+ };
+ const size_t num_objects = object_ids.size();
+ *out = std::vector<ObjectBuffer>(num_objects);
+ return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0]);
+}
+
+Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
+ int64_t timeout_ms, ObjectBuffer* out) {
+ const auto wrap_buffer = [](const ObjectID& object_id,
+ const std::shared_ptr<Buffer>& buffer) { return buffer; };
+ return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
+}
+
Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
@@ -546,24 +579,26 @@ static inline bool compute_object_hash_parallel(XXH64_state_t* hash_state,
}
static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
+ DCHECK(obj_buffer.metadata);
+ DCHECK(obj_buffer.data);
XXH64_state_t hash_state;
if (obj_buffer.device_num != 0) {
// TODO(wap): Create cuda program to hash data on gpu.
return 0;
}
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
- if (obj_buffer.data_size >= kBytesInMB) {
+ if (obj_buffer.data->size() >= kBytesInMB) {
compute_object_hash_parallel(
&hash_state, reinterpret_cast<const unsigned char*>(obj_buffer.data->data()),
- obj_buffer.data_size);
+ obj_buffer.data->size());
} else {
XXH64_update(&hash_state,
reinterpret_cast<const unsigned char*>(obj_buffer.data->data()),
- obj_buffer.data_size);
+ obj_buffer.data->size());
}
XXH64_update(&hash_state,
reinterpret_cast<const unsigned char*>(obj_buffer.metadata->data()),
- obj_buffer.metadata_size);
+ obj_buffer.metadata->size());
return XXH64_digest(&hash_state);
}
@@ -647,17 +682,16 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
// Get the plasma object data. We pass in a timeout of 0 to indicate that
// the operation should timeout immediately.
- ObjectBuffer object_buffer;
- RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
+ std::vector<ObjectBuffer> object_buffers;
+ RETURN_NOT_OK(Get({object_id}, 0, &object_buffers));
// If the object was not retrieved, return false.
- if (object_buffer.data_size == -1) {
+ if (!object_buffers[0].data) {
return Status::PlasmaObjectNonexistent("Object not found");
}
// Compute the hash.
- uint64_t hash = compute_object_hash(object_buffer);
+ uint64_t hash = compute_object_hash(object_buffers[0]);
memcpy(digest, &hash, sizeof(hash));
- // Release the plasma object.
- return Release(object_id);
+ return Status::OK();
}
Status PlasmaClient::Subscribe(int* fd) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 7c27c47..dd8175d 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -25,9 +25,11 @@
#include <memory>
#include <string>
#include <unordered_map>
+#include <vector>
#include "arrow/buffer.h"
#include "arrow/status.h"
+#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
#include "plasma/common.h"
#ifdef PLASMA_GPU
@@ -48,12 +50,8 @@ constexpr int64_t kL3CacheSizeBytes = 100000000;
struct ObjectBuffer {
/// The data buffer.
std::shared_ptr<Buffer> data;
- /// The size in bytes of the data object.
- int64_t data_size;
/// The metadata buffer.
std::shared_ptr<Buffer> metadata;
- /// The metadata size in bytes.
- int64_t metadata_size;
/// The device number.
int device_num;
};
@@ -121,31 +119,47 @@ class ARROW_EXPORT PlasmaClient {
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
/// \return The return status.
- Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
+ ///
+ /// The returned object must be released once it is done with. It must also
+ /// be either sealed or aborted.
+ Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
+
/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
- /// timeout
- /// expires. The caller is responsible for releasing any retrieved objects,
- /// but
- /// the caller should not release objects that were not retrieved.
+ /// timeout expires.
+ ///
+ /// \param object_ids The IDs of the objects to get.
+ /// \param timeout_ms The amount of time in milliseconds to wait before this
+ /// request times out. If this value is -1, then no timeout is set.
+ /// \param[out] object_buffers The object results.
+ /// \return The return status.
+ ///
+ /// If an object was not retrieved, the corresponding metadata and data
+ /// fields in the ObjectBuffer structure will evaluate to false.
+ /// Objects are automatically released by the client when their buffers
+ /// get out of scope.
+ Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
+ std::vector<ObjectBuffer>* object_buffers);
+
+ /// Deprecated variant of Get() that doesn't automatically release buffers
+ /// when they get out of scope.
///
/// \param object_ids The IDs of the objects to get.
/// \param num_objects The number of object IDs to get.
/// \param timeout_ms The amount of time in milliseconds to wait before this
/// request times out. If this value is -1, then no timeout is set.
- /// \param object_buffers An array where the results will be stored. If the
- /// data
- /// size field is -1, then the object was not retrieved.
+ /// \param object_buffers An array where the results will be stored.
/// \return The return status.
+ ///
+ /// The caller is responsible for releasing any retrieved objects, but it
+ /// should not release objects that were not retrieved.
Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
ObjectBuffer* object_buffers);
/// Tell Plasma that the client no longer needs the object. This should be
- /// called
- /// after Get when the client is done with the object. After this call,
- /// the address returned by Get is no longer valid. This should be called
- /// once for each call to Get (with the same object ID).
+ /// called after Get() or Create() when the client is done with the object.
+ /// After this call, the buffer returned by Get() is no longer valid.
///
/// \param object_id The ID of the object that is no longer needed.
/// \return The return status.
@@ -328,6 +342,10 @@ class ARROW_EXPORT PlasmaClient {
int get_manager_fd() const;
private:
+ FRIEND_TEST(TestPlasmaStore, GetTest);
+ FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
+ FRIEND_TEST(TestPlasmaStore, AbortTest);
+
/// This is a helper method for unmapping objects for which all references have
/// gone out of scope, either by calling Release or Abort.
///
@@ -340,6 +358,14 @@ class ARROW_EXPORT PlasmaClient {
Status PerformRelease(const ObjectID& object_id);
+ /// Common helper for Get() variants
+ Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+ const std::function<std::shared_ptr<Buffer>(
+ const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
+ ObjectBuffer* object_buffers);
+
+ bool IsInUse(const ObjectID& object_id);
+
uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
uint8_t* lookup_mmapped_file(int store_fd_val);
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 07e0f9c..10e4e4f 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -24,6 +24,8 @@
#include <random>
+#include "arrow/test-util.h"
+
#include "plasma/client.h"
#include "plasma/common.h"
#include "plasma/plasma.h"
@@ -35,6 +37,13 @@ namespace plasma {
std::string test_executable; // NOLINT
+void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
+ const std::vector<uint8_t>& metadata,
+ const std::vector<uint8_t>& data) {
+ arrow::test::AssertBufferEqual(*object_buffer.metadata, metadata);
+ arrow::test::AssertBufferEqual(*object_buffer.data, data);
+}
+
class TestPlasmaStore : public ::testing::Test {
public:
// TODO(pcm): At the moment, stdout of the test gets mixed up with
@@ -55,10 +64,25 @@ class TestPlasmaStore : public ::testing::Test {
ARROW_CHECK_OK(
client2_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
}
- virtual void Finish() {
+ virtual void TearDown() {
ARROW_CHECK_OK(client_.Disconnect());
ARROW_CHECK_OK(client2_.Disconnect());
- system("killall plasma_store &");
+ // Kill all plasma_store processes
+ // TODO should only kill the processes we launched
+ system("killall -9 plasma_store");
+ }
+
+ void CreateObject(PlasmaClient& client, const ObjectID& object_id,
+ const std::vector<uint8_t>& metadata,
+ const std::vector<uint8_t>& data) {
+ std::shared_ptr<Buffer> data_buffer;
+ ARROW_CHECK_OK(client.Create(object_id, data.size(), &metadata[0], metadata.size(),
+ &data_buffer));
+ for (size_t i = 0; i < data.size(); i++) {
+ data_buffer->mutable_data()[i] = data[i];
+ }
+ ARROW_CHECK_OK(client.Seal(object_id));
+ ARROW_CHECK_OK(client.Release(object_id));
}
protected:
@@ -101,54 +125,87 @@ TEST_F(TestPlasmaStore, ContainsTest) {
// Test for the object being in local Plasma store.
// First create object.
- int64_t data_size = 100;
- uint8_t metadata[] = {5};
- int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
- ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
- ARROW_CHECK_OK(client_.Seal(object_id));
+ std::vector<uint8_t> data(100, 0);
+ CreateObject(client_, object_id, {42}, data);
// Avoid race condition of Plasma Manager waiting for notification.
- ObjectBuffer object_buffer;
- ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ std::vector<ObjectBuffer> object_buffers;
+ ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_EQ(has_object, true);
}
TEST_F(TestPlasmaStore, GetTest) {
+ std::vector<ObjectBuffer> object_buffers;
+
ObjectID object_id = ObjectID::from_random();
- ObjectBuffer object_buffer;
// Test for object non-existence.
- ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
- ASSERT_EQ(object_buffer.data_size, -1);
+ ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+ ASSERT_EQ(object_buffers.size(), 1);
+ ASSERT_FALSE(object_buffers[0].metadata);
+ ASSERT_FALSE(object_buffers[0].data);
+ EXPECT_FALSE(client_.IsInUse(object_id));
// Test for the object being in local Plasma store.
// First create object.
- int64_t data_size = 4;
- uint8_t metadata[] = {5};
- int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data_buffer;
- uint8_t* data;
- ARROW_CHECK_OK(
- client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer));
- data = data_buffer->mutable_data();
- for (int64_t i = 0; i < data_size; i++) {
- data[i] = static_cast<uint8_t>(i % 4);
+ std::vector<uint8_t> data = {3, 5, 6, 7, 9};
+ CreateObject(client_, object_id, {42}, data);
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_FALSE(client_.IsInUse(object_id));
+
+ object_buffers.clear();
+ ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+ ASSERT_EQ(object_buffers.size(), 1);
+ ASSERT_EQ(object_buffers[0].device_num, 0);
+ AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9});
+
+ // Metadata keeps object in use
+ {
+ auto metadata = object_buffers[0].metadata;
+ object_buffers.clear();
+ ::arrow::test::AssertBufferEqual(*metadata, {42});
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_TRUE(client_.IsInUse(object_id));
}
- ARROW_CHECK_OK(client_.Seal(object_id));
+ // Object is automatically released
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_FALSE(client_.IsInUse(object_id));
+}
- ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
- const uint8_t* object_data = object_buffer.data->data();
- for (int64_t i = 0; i < data_size; i++) {
- ASSERT_EQ(data[i], object_data[i]);
+TEST_F(TestPlasmaStore, LegacyGetTest) {
+ // Test for old non-releasing Get() variant
+ ObjectID object_id = ObjectID::from_random();
+ {
+ ObjectBuffer object_buffer;
+
+ // Test for object non-existence.
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+ ASSERT_FALSE(object_buffer.metadata);
+ ASSERT_FALSE(object_buffer.data);
+ EXPECT_FALSE(client_.IsInUse(object_id));
+
+ // First create object.
+ std::vector<uint8_t> data = {3, 5, 6, 7, 9};
+ CreateObject(client_, object_id, {42}, data);
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_FALSE(client_.IsInUse(object_id));
+
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9});
}
+ // Object needs releasing manually
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_TRUE(client_.IsInUse(object_id));
+ ARROW_CHECK_OK(client_.Release(object_id));
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_FALSE(client_.IsInUse(object_id));
}
TEST_F(TestPlasmaStore, MultipleGetTest) {
ObjectID object_id1 = ObjectID::from_random();
ObjectID object_id2 = ObjectID::from_random();
- ObjectID object_ids[2] = {object_id1, object_id2};
- ObjectBuffer object_buffer[2];
+ std::vector<ObjectID> object_ids = {object_id1, object_id2};
+ std::vector<ObjectBuffer> object_buffers;
int64_t data_size = 4;
uint8_t metadata[] = {5};
@@ -162,18 +219,18 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
data->mutable_data()[0] = 2;
ARROW_CHECK_OK(client_.Seal(object_id2));
- ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
- ASSERT_EQ(object_buffer[0].data->data()[0], 1);
- ASSERT_EQ(object_buffer[1].data->data()[0], 2);
+ ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
+ ASSERT_EQ(object_buffers[0].data->data()[0], 1);
+ ASSERT_EQ(object_buffers[1].data->data()[0], 2);
}
TEST_F(TestPlasmaStore, AbortTest) {
ObjectID object_id = ObjectID::from_random();
- ObjectBuffer object_buffer;
+ std::vector<ObjectBuffer> object_buffers;
// Test for object non-existence.
- ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
- ASSERT_EQ(object_buffer.data_size, -1);
+ ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+ ASSERT_FALSE(object_buffers[0].data);
// Test object abort.
// First create object.
@@ -193,30 +250,29 @@ TEST_F(TestPlasmaStore, AbortTest) {
ASSERT_TRUE(status.IsInvalid());
// Release, then abort.
ARROW_CHECK_OK(client_.Release(object_id));
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_TRUE(client_.IsInUse(object_id));
+
ARROW_CHECK_OK(client_.Abort(object_id));
+ ARROW_CHECK_OK(client_.FlushReleaseHistory());
+ EXPECT_FALSE(client_.IsInUse(object_id));
// Test for object non-existence after the abort.
- ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
- ASSERT_EQ(object_buffer.data_size, -1);
+ ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+ ASSERT_FALSE(object_buffers[0].data);
// Create the object successfully this time.
- ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
- data_ptr = data->mutable_data();
- for (int64_t i = 0; i < data_size; i++) {
- data_ptr[i] = static_cast<uint8_t>(i % 4);
- }
- ARROW_CHECK_OK(client_.Seal(object_id));
+ CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5});
// Test that we can get the object.
- ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
- const uint8_t* buffer_ptr = object_buffer.data->data();
- for (int64_t i = 0; i < data_size; i++) {
- ASSERT_EQ(data_ptr[i], buffer_ptr[i]);
- }
+ ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+ AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
+ ARROW_CHECK_OK(client_.Release(object_id));
}
TEST_F(TestPlasmaStore, MultipleClientTest) {
ObjectID object_id = ObjectID::from_random();
+ std::vector<ObjectBuffer> object_buffers;
// Test for object non-existence on the first client.
bool has_object;
@@ -232,8 +288,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client2_.Seal(object_id));
// Test that the first client can get the object.
- ObjectBuffer object_buffer;
- ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+ ASSERT_TRUE(object_buffers[0].data);
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_EQ(has_object, true);
@@ -245,7 +301,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
ARROW_CHECK_OK(client_.Disconnect());
// Test that the second client can seal and get the created object.
ARROW_CHECK_OK(client2_.Seal(object_id));
- ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
+ ASSERT_TRUE(object_buffers[0].data);
ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
ASSERT_EQ(has_object, true);
}
@@ -308,42 +365,66 @@ using arrow::gpu::CudaBuffer;
using arrow::gpu::CudaBufferReader;
using arrow::gpu::CudaBufferWriter;
+namespace {
+
+void AssertCudaRead(const std::shared_ptr<Buffer>& buffer,
+ const std::vector<uint8_t>& expected_data) {
+ std::shared_ptr<CudaBuffer> gpu_buffer;
+ const size_t data_size = expected_data.size();
+
+ ARROW_CHECK_OK(CudaBuffer::FromBuffer(buffer, &gpu_buffer));
+ ASSERT_EQ(gpu_buffer->size(), data_size);
+
+ CudaBufferReader reader(gpu_buffer);
+ uint8_t read_data[data_size];
+ int64_t read_data_size;
+ ARROW_CHECK_OK(reader.Read(data_size, &read_data_size, read_data));
+ ASSERT_EQ(read_data_size, data_size);
+
+ for (size_t i = 0; i < data_size; i++) {
+ ASSERT_EQ(read_data[i], expected_data[i]);
+ }
+}
+
+} // namespace
+
TEST_F(TestPlasmaStore, GetGPUTest) {
ObjectID object_id = ObjectID::from_random();
- ObjectBuffer object_buffer;
+ std::vector<ObjectBuffer> object_buffers;
// Test for object non-existence.
- ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
- ASSERT_EQ(object_buffer.data_size, -1);
+ ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+ ASSERT_EQ(object_buffers.size(), 1);
+ ASSERT_FALSE(object_buffers[0].data);
// Test for the object being in local Plasma store.
// First create object.
uint8_t data[] = {4, 5, 3, 1};
int64_t data_size = sizeof(data);
- uint8_t metadata[] = {5};
+ uint8_t metadata[] = {42};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data_buffer;
std::shared_ptr<CudaBuffer> gpu_buffer;
ARROW_CHECK_OK(
client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer, 1));
- gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(data_buffer);
+ ARROW_CHECK_OK(CudaBuffer::FromBuffer(data_buffer, &gpu_buffer));
CudaBufferWriter writer(gpu_buffer);
- writer.Write(data, data_size);
+ ARROW_CHECK_OK(writer.Write(data, data_size));
ARROW_CHECK_OK(client_.Seal(object_id));
- ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
- gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(object_buffer.data);
- CudaBufferReader reader(gpu_buffer);
- uint8_t read_data[data_size];
- int64_t read_data_size;
- reader.Read(data_size, &read_data_size, read_data);
- for (int64_t i = 0; i < data_size; i++) {
- ASSERT_EQ(data[i], read_data[i]);
- }
+ object_buffers.clear();
+ ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+ ASSERT_EQ(object_buffers.size(), 1);
+ ASSERT_EQ(object_buffers[0].device_num, 1);
+ // Check data
+ AssertCudaRead(object_buffers[0].data, {4, 5, 3, 1});
+ // Check metadata
+ AssertCudaRead(object_buffers[0].metadata, {42});
}
TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
ObjectID object_id = ObjectID::from_random();
+ std::vector<ObjectBuffer> object_buffers;
// Test for object non-existence on the first client.
bool has_object;
@@ -360,8 +441,7 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
client2_.Create(object_id, data_size, metadata, metadata_size, &data, 1));
ARROW_CHECK_OK(client2_.Seal(object_id));
// Test that the first client can get the object.
- ObjectBuffer object_buffer;
- ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_EQ(has_object, true);
@@ -374,12 +454,16 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
ARROW_CHECK_OK(client_.Disconnect());
// Test that the second client can seal and get the created object.
ARROW_CHECK_OK(client2_.Seal(object_id));
- ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+ object_buffers.clear();
ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
ASSERT_EQ(has_object, true);
+ ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
+ ASSERT_EQ(object_buffers.size(), 1);
+ ASSERT_EQ(object_buffers[0].device_num, 1);
+ AssertCudaRead(object_buffers[0].metadata, {5});
}
-#endif
+#endif // PLASMA_GPU
} // namespace plasma
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 32f6d18..b99e2b0 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -29,7 +29,7 @@ from cpython.pycapsule cimport *
import collections
import pyarrow
-from pyarrow.lib cimport Buffer, NativeFile, check_status
+from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
CFixedSizeBufferWriter, CStatus)
@@ -83,8 +83,8 @@ cdef extern from "plasma/client.h" nogil:
const uint8_t* metadata, int64_t metadata_size,
const shared_ptr[CBuffer]* data)
- CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
- int64_t timeout_ms, CObjectBuffer* object_buffers)
+ CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
+ c_vector[CObjectBuffer]* object_buffers)
CStatus Seal(const CUniqueID& object_id)
@@ -117,9 +117,7 @@ cdef extern from "plasma/client.h" nogil:
cdef extern from "plasma/client.h" nogil:
cdef struct CObjectBuffer" plasma::ObjectBuffer":
- int64_t data_size
shared_ptr[CBuffer] data
- int64_t metadata_size
shared_ptr[CBuffer] metadata
@@ -239,21 +237,16 @@ cdef class PlasmaClient:
cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* result):
- cdef c_vector[CUniqueID] ids
- cdef ObjectID object_id
+ cdef:
+ c_vector[CUniqueID] ids
+ ObjectID object_id
+
for object_id in object_ids:
ids.push_back(object_id.data)
- result[0].resize(ids.size())
with nogil:
- check_status(self.client.get().Get(ids.data(), ids.size(),
- timeout_ms, result[0].data()))
-
- cdef _make_plasma_buffer(self, ObjectID object_id,
- shared_ptr[CBuffer] buffer, int64_t size):
- result = PlasmaBuffer(object_id, self)
- result.init(buffer)
- return result
+ check_status(self.client.get().Get(ids, timeout_ms, result))
+ # XXX C++ API should instead expose some kind of CreateAuto()
cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data,
int64_t size):
cdef shared_ptr[CBuffer] buffer
@@ -332,11 +325,8 @@ cdef class PlasmaClient:
self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
result = []
for i in range(object_buffers.size()):
- if object_buffers[i].data_size != -1:
- result.append(
- self._make_plasma_buffer(object_ids[i],
- object_buffers[i].data,
- object_buffers[i].data_size))
+ if object_buffers[i].data.get() != nullptr:
+ result.append(pyarrow_wrap_buffer(object_buffers[i].data))
else:
result.append(None)
return result
@@ -367,10 +357,10 @@ cdef class PlasmaClient:
self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
result = []
for i in range(object_buffers.size()):
- result.append(
- self._make_plasma_buffer(object_ids[i],
- object_buffers[i].metadata,
- object_buffers[i].metadata_size))
+ if object_buffers[i].metadata.get() != nullptr:
+ result.append(pyarrow_wrap_buffer(object_buffers[i].metadata))
+ else:
+ result.append(None)
return result
def put(self, object value, ObjectID object_id=None, int memcopy_threads=6,
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 1df213d..4358841 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -190,7 +190,6 @@ class TestPlasmaClient(object):
plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
# Connect to Plasma.
self.plasma_client = plasma.connect(plasma_store_name, "", 64)
- # For the eviction test
self.plasma_client2 = plasma.connect(plasma_store_name, "", 0)
def teardown_method(self, test_method):
@@ -311,6 +310,36 @@ class TestPlasmaClient(object):
else:
assert results[i] is None
+ def test_buffer_lifetime(self):
+ # ARROW-2195
+ arr = pa.array([1, 12, 23, 3, 34], pa.int32())
+ batch = pa.RecordBatch.from_arrays([arr], ['field1'])
+
+ # Serialize RecordBatch into Plasma store
+ sink = pa.MockOutputStream()
+ writer = pa.RecordBatchStreamWriter(sink, batch.schema)
+ writer.write_batch(batch)
+ writer.close()
+
+ object_id = random_object_id()
+ data_buffer = self.plasma_client.create(object_id, sink.size())
+ stream = pa.FixedSizeBufferWriter(data_buffer)
+ writer = pa.RecordBatchStreamWriter(stream, batch.schema)
+ writer.write_batch(batch)
+ writer.close()
+ self.plasma_client.seal(object_id)
+ del data_buffer
+
+ # Unserialize RecordBatch from Plasma store
+ [data_buffer] = self.plasma_client2.get_buffers([object_id])
+ reader = pa.RecordBatchStreamReader(data_buffer)
+ read_batch = reader.read_next_batch()
+ # Lose reference to returned buffer. The RecordBatch must still
+ # be backed by valid memory.
+ del data_buffer, reader
+
+ assert read_batch.equals(batch)
+
def test_put_and_get(self):
for value in [["hello", "world", 3, 1.0], None, "hello"]:
object_id = self.plasma_client.put(value)
@@ -770,15 +799,15 @@ class TestPlasmaClient(object):
# them go out of scope.
for _ in range(100):
create_object(
- self.plasma_client,
+ self.plasma_client2,
np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
# Create large objects that require the full object store size, and
# verify that they fit.
for _ in range(2):
- create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY, 0)
+ create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
# Verify that an object that is too large does not fit.
with pytest.raises(pa.lib.PlasmaStoreFull):
- create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY + 1,
+ create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
0)
--
To stop receiving notification emails like this one, please contact
pcmoritz@apache.org.