You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2020/03/09 02:26:15 UTC
[arrow] branch master updated: ARROW-7991: [C++][Plasma] Allow
option for evicting if full when creating an object
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new af45b92 ARROW-7991: [C++][Plasma] Allow option for evicting if full when creating an object
af45b92 is described below
commit af45b9212156980f55c399e2e88b4e19b4bb8ec1
Author: Stephanie Wang <sw...@cs.berkeley.edu>
AuthorDate: Sun Mar 8 19:25:55 2020 -0700
ARROW-7991: [C++][Plasma] Allow option for evicting if full when creating an object
Allow the client to pass in a flag during object creation specifying whether objects should be evicted or not.
Closes #6520 from stephanie-wang/try-evict and squashes the following commits:
9a9dc1a5e <Stephanie Wang> Merge branch 'master' into try-evict
9e8c08f56 <Stephanie Wang> fix
2f38969a3 <Stephanie Wang> Merge remote-tracking branch 'upstream/master' into try-evict
a32ab9b83 <Stephanie Wang> Default evict_if_full arg
9ddc881df <Stephanie Wang> document arg
c2ba17c15 <Stephanie Wang> fix pyx
48e70bf76 <Stephanie Wang> Fix cpp
62b2f636a <Stephanie Wang> fix tests
288927472 <Stephanie Wang> protocol
ecef91564 <Stephanie Wang> Add flag to evict if full
Authored-by: Stephanie Wang <sw...@cs.berkeley.edu>
Signed-off-by: Philipp Moritz <pc...@gmail.com>
---
cpp/src/plasma/client.cc | 43 ++++++++++-------
cpp/src/plasma/client.h | 14 ++++--
cpp/src/plasma/plasma.fbs | 6 +++
cpp/src/plasma/plasma_generated.h | 75 +++++++++++++++++++++++++-----
cpp/src/plasma/protocol.cc | 27 +++++++----
cpp/src/plasma/protocol.h | 15 +++---
cpp/src/plasma/store.cc | 75 ++++++++++++++++++------------
cpp/src/plasma/store.h | 14 ++++--
cpp/src/plasma/test/serialization_tests.cc | 9 ++--
python/pyarrow/_plasma.pyx | 10 ++--
10 files changed, 197 insertions(+), 91 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 0640091..1fbb9d4 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -225,14 +225,16 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status SetClientOptions(const std::string& client_name, int64_t output_memory_quota);
Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
- int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
+ int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0,
+ bool evict_if_full = true);
Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
- const std::string& metadata);
+ const std::string& metadata, bool evict_if_full = true);
Status CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
- const std::vector<std::string>& metadata);
+ const std::vector<std::string>& metadata,
+ bool evict_if_full = true);
Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer>* object_buffers);
@@ -416,13 +418,14 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
- std::shared_ptr<Buffer>* data, int device_num) {
+ std::shared_ptr<Buffer>* data, int device_num,
+ bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
- RETURN_NOT_OK(
- SendCreateRequest(store_conn_, object_id, data_size, metadata_size, device_num));
+ RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size,
+ metadata_size, device_num));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer));
ObjectID id;
@@ -485,7 +488,8 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
const std::string& data,
- const std::string& metadata) {
+ const std::string& metadata,
+ bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
@@ -496,7 +500,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size());
memcpy(&digest[0], &hash, sizeof(hash));
- RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data, metadata, digest));
+ RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data,
+ metadata, digest));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, &buffer));
@@ -506,7 +511,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
- const std::vector<std::string>& metadata) {
+ const std::vector<std::string>& metadata,
+ bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_;
@@ -522,8 +528,8 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& objec
digests.push_back(digest);
}
- RETURN_NOT_OK(
- SendCreateAndSealBatchRequest(store_conn_, object_ids, data, metadata, digests));
+ RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full,
+ data, metadata, digests));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealBatchReply, &buffer));
@@ -1131,19 +1137,22 @@ Status PlasmaClient::SetClientOptions(const std::string& client_name,
Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
- std::shared_ptr<Buffer>* data, int device_num) {
- return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num);
+ std::shared_ptr<Buffer>* data, int device_num,
+ bool evict_if_full) {
+ return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num,
+ evict_if_full);
}
Status PlasmaClient::CreateAndSeal(const ObjectID& object_id, const std::string& data,
- const std::string& metadata) {
- return impl_->CreateAndSeal(object_id, data, metadata);
+ const std::string& metadata, bool evict_if_full) {
+ return impl_->CreateAndSeal(object_id, data, metadata, evict_if_full);
}
Status PlasmaClient::CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
- const std::vector<std::string>& metadata) {
- return impl_->CreateAndSealBatch(object_ids, data, metadata);
+ const std::vector<std::string>& metadata,
+ bool evict_if_full) {
+ return impl_->CreateAndSealBatch(object_ids, data, metadata, evict_if_full);
}
Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 64ceccc..76d04a8 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -90,12 +90,15 @@ class ARROW_EXPORT PlasmaClient {
/// device_num = 0 corresponds to the host,
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
+ /// \param evict_if_full Whether to evict other objects to make space for
+ /// this object.
/// \return The return status.
///
/// The returned object must be released once it is done with. It must also
/// be either sealed or aborted.
Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
- int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
+ int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0,
+ bool evict_if_full = true);
/// Create and seal an object in the object store. This is an optimization
/// which allows small objects to be created quickly with fewer messages to
@@ -104,9 +107,11 @@ class ARROW_EXPORT PlasmaClient {
/// \param object_id The ID of the object to create.
/// \param data The data for the object to create.
/// \param metadata The metadata for the object to create.
+ /// \param evict_if_full Whether to evict other objects to make space for
+ /// this object.
/// \return The return status.
Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
- const std::string& metadata);
+ const std::string& metadata, bool evict_if_full = true);
/// Create and seal multiple objects in the object store. This is an optimization
/// of CreateAndSeal to eliminate the cost of IPC per object.
@@ -114,10 +119,13 @@ class ARROW_EXPORT PlasmaClient {
/// \param object_ids The vector of IDs of the objects to create.
/// \param data The vector of data for the objects to create.
/// \param metadata The vector of metadata for the objects to create.
+ /// \param evict_if_full Whether to evict other objects to make space for
+ /// these objects.
/// \return The return status.
Status CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
- const std::vector<std::string>& metadata);
+ const std::vector<std::string>& metadata,
+ bool evict_if_full = true);
/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
diff --git a/cpp/src/plasma/plasma.fbs b/cpp/src/plasma/plasma.fbs
index 16b2628..40556b4 100644
--- a/cpp/src/plasma/plasma.fbs
+++ b/cpp/src/plasma/plasma.fbs
@@ -139,6 +139,8 @@ table PlasmaGetDebugStringReply {
table PlasmaCreateRequest {
// ID of the object to be created.
object_id: string;
+ // Whether to evict other objects to make room for this one.
+ evict_if_full: bool;
// The size of the object's data in bytes.
data_size: ulong;
// The size of the object's metadata in bytes.
@@ -171,6 +173,8 @@ table PlasmaCreateReply {
table PlasmaCreateAndSealRequest {
// ID of the object to be created.
object_id: string;
+ // Whether to evict other objects to make room for this one.
+ evict_if_full: bool;
// The object's data.
data: string;
// The object's metadata.
@@ -186,6 +190,8 @@ table PlasmaCreateAndSealReply {
table PlasmaCreateAndSealBatchRequest {
object_ids: [string];
+ // Whether to evict other objects to make room for these objects.
+ evict_if_full: bool;
data: [string];
metadata: [string];
digest: [string];
diff --git a/cpp/src/plasma/plasma_generated.h b/cpp/src/plasma/plasma_generated.h
index 43edbee..3331859 100644
--- a/cpp/src/plasma/plasma_generated.h
+++ b/cpp/src/plasma/plasma_generated.h
@@ -577,11 +577,13 @@ flatbuffers::Offset<PlasmaGetDebugStringReply> CreatePlasmaGetDebugStringReply(f
struct PlasmaCreateRequestT : public flatbuffers::NativeTable {
typedef PlasmaCreateRequest TableType;
std::string object_id;
+ bool evict_if_full;
uint64_t data_size;
uint64_t metadata_size;
int32_t device_num;
PlasmaCreateRequestT()
- : data_size(0),
+ : evict_if_full(false),
+ data_size(0),
metadata_size(0),
device_num(0) {
}
@@ -591,13 +593,17 @@ struct PlasmaCreateRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
typedef PlasmaCreateRequestT NativeTableType;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_OBJECT_ID = 4,
- VT_DATA_SIZE = 6,
- VT_METADATA_SIZE = 8,
- VT_DEVICE_NUM = 10
+ VT_EVICT_IF_FULL = 6,
+ VT_DATA_SIZE = 8,
+ VT_METADATA_SIZE = 10,
+ VT_DEVICE_NUM = 12
};
const flatbuffers::String *object_id() const {
return GetPointer<const flatbuffers::String *>(VT_OBJECT_ID);
}
+ bool evict_if_full() const {
+ return GetField<uint8_t>(VT_EVICT_IF_FULL, 0) != 0;
+ }
uint64_t data_size() const {
return GetField<uint64_t>(VT_DATA_SIZE, 0);
}
@@ -611,6 +617,7 @@ struct PlasmaCreateRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_OBJECT_ID) &&
verifier.VerifyString(object_id()) &&
+ VerifyField<uint8_t>(verifier, VT_EVICT_IF_FULL) &&
VerifyField<uint64_t>(verifier, VT_DATA_SIZE) &&
VerifyField<uint64_t>(verifier, VT_METADATA_SIZE) &&
VerifyField<int32_t>(verifier, VT_DEVICE_NUM) &&
@@ -627,6 +634,9 @@ struct PlasmaCreateRequestBuilder {
void add_object_id(flatbuffers::Offset<flatbuffers::String> object_id) {
fbb_.AddOffset(PlasmaCreateRequest::VT_OBJECT_ID, object_id);
}
+ void add_evict_if_full(bool evict_if_full) {
+ fbb_.AddElement<uint8_t>(PlasmaCreateRequest::VT_EVICT_IF_FULL, static_cast<uint8_t>(evict_if_full), 0);
+ }
void add_data_size(uint64_t data_size) {
fbb_.AddElement<uint64_t>(PlasmaCreateRequest::VT_DATA_SIZE, data_size, 0);
}
@@ -651,6 +661,7 @@ struct PlasmaCreateRequestBuilder {
inline flatbuffers::Offset<PlasmaCreateRequest> CreatePlasmaCreateRequest(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> object_id = 0,
+ bool evict_if_full = false,
uint64_t data_size = 0,
uint64_t metadata_size = 0,
int32_t device_num = 0) {
@@ -659,12 +670,14 @@ inline flatbuffers::Offset<PlasmaCreateRequest> CreatePlasmaCreateRequest(
builder_.add_data_size(data_size);
builder_.add_device_num(device_num);
builder_.add_object_id(object_id);
+ builder_.add_evict_if_full(evict_if_full);
return builder_.Finish();
}
inline flatbuffers::Offset<PlasmaCreateRequest> CreatePlasmaCreateRequestDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *object_id = nullptr,
+ bool evict_if_full = false,
uint64_t data_size = 0,
uint64_t metadata_size = 0,
int32_t device_num = 0) {
@@ -672,6 +685,7 @@ inline flatbuffers::Offset<PlasmaCreateRequest> CreatePlasmaCreateRequestDirect(
return plasma::flatbuf::CreatePlasmaCreateRequest(
_fbb,
object_id__,
+ evict_if_full,
data_size,
metadata_size,
device_num);
@@ -877,10 +891,12 @@ flatbuffers::Offset<PlasmaCreateReply> CreatePlasmaCreateReply(flatbuffers::Flat
struct PlasmaCreateAndSealRequestT : public flatbuffers::NativeTable {
typedef PlasmaCreateAndSealRequest TableType;
std::string object_id;
+ bool evict_if_full;
std::string data;
std::string metadata;
std::string digest;
- PlasmaCreateAndSealRequestT() {
+ PlasmaCreateAndSealRequestT()
+ : evict_if_full(false) {
}
};
@@ -888,13 +904,17 @@ struct PlasmaCreateAndSealRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers:
typedef PlasmaCreateAndSealRequestT NativeTableType;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_OBJECT_ID = 4,
- VT_DATA = 6,
- VT_METADATA = 8,
- VT_DIGEST = 10
+ VT_EVICT_IF_FULL = 6,
+ VT_DATA = 8,
+ VT_METADATA = 10,
+ VT_DIGEST = 12
};
const flatbuffers::String *object_id() const {
return GetPointer<const flatbuffers::String *>(VT_OBJECT_ID);
}
+ bool evict_if_full() const {
+ return GetField<uint8_t>(VT_EVICT_IF_FULL, 0) != 0;
+ }
const flatbuffers::String *data() const {
return GetPointer<const flatbuffers::String *>(VT_DATA);
}
@@ -908,6 +928,7 @@ struct PlasmaCreateAndSealRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers:
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_OBJECT_ID) &&
verifier.VerifyString(object_id()) &&
+ VerifyField<uint8_t>(verifier, VT_EVICT_IF_FULL) &&
VerifyOffset(verifier, VT_DATA) &&
verifier.VerifyString(data()) &&
VerifyOffset(verifier, VT_METADATA) &&
@@ -927,6 +948,9 @@ struct PlasmaCreateAndSealRequestBuilder {
void add_object_id(flatbuffers::Offset<flatbuffers::String> object_id) {
fbb_.AddOffset(PlasmaCreateAndSealRequest::VT_OBJECT_ID, object_id);
}
+ void add_evict_if_full(bool evict_if_full) {
+ fbb_.AddElement<uint8_t>(PlasmaCreateAndSealRequest::VT_EVICT_IF_FULL, static_cast<uint8_t>(evict_if_full), 0);
+ }
void add_data(flatbuffers::Offset<flatbuffers::String> data) {
fbb_.AddOffset(PlasmaCreateAndSealRequest::VT_DATA, data);
}
@@ -951,6 +975,7 @@ struct PlasmaCreateAndSealRequestBuilder {
inline flatbuffers::Offset<PlasmaCreateAndSealRequest> CreatePlasmaCreateAndSealRequest(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> object_id = 0,
+ bool evict_if_full = false,
flatbuffers::Offset<flatbuffers::String> data = 0,
flatbuffers::Offset<flatbuffers::String> metadata = 0,
flatbuffers::Offset<flatbuffers::String> digest = 0) {
@@ -959,12 +984,14 @@ inline flatbuffers::Offset<PlasmaCreateAndSealRequest> CreatePlasmaCreateAndSeal
builder_.add_metadata(metadata);
builder_.add_data(data);
builder_.add_object_id(object_id);
+ builder_.add_evict_if_full(evict_if_full);
return builder_.Finish();
}
inline flatbuffers::Offset<PlasmaCreateAndSealRequest> CreatePlasmaCreateAndSealRequestDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *object_id = nullptr,
+ bool evict_if_full = false,
const char *data = nullptr,
const char *metadata = nullptr,
const char *digest = nullptr) {
@@ -975,6 +1002,7 @@ inline flatbuffers::Offset<PlasmaCreateAndSealRequest> CreatePlasmaCreateAndSeal
return plasma::flatbuf::CreatePlasmaCreateAndSealRequest(
_fbb,
object_id__,
+ evict_if_full,
data__,
metadata__,
digest__);
@@ -1039,10 +1067,12 @@ flatbuffers::Offset<PlasmaCreateAndSealReply> CreatePlasmaCreateAndSealReply(fla
struct PlasmaCreateAndSealBatchRequestT : public flatbuffers::NativeTable {
typedef PlasmaCreateAndSealBatchRequest TableType;
std::vector<std::string> object_ids;
+ bool evict_if_full;
std::vector<std::string> data;
std::vector<std::string> metadata;
std::vector<std::string> digest;
- PlasmaCreateAndSealBatchRequestT() {
+ PlasmaCreateAndSealBatchRequestT()
+ : evict_if_full(false) {
}
};
@@ -1050,13 +1080,17 @@ struct PlasmaCreateAndSealBatchRequest FLATBUFFERS_FINAL_CLASS : private flatbuf
typedef PlasmaCreateAndSealBatchRequestT NativeTableType;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_OBJECT_IDS = 4,
- VT_DATA = 6,
- VT_METADATA = 8,
- VT_DIGEST = 10
+ VT_EVICT_IF_FULL = 6,
+ VT_DATA = 8,
+ VT_METADATA = 10,
+ VT_DIGEST = 12
};
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> *object_ids() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> *>(VT_OBJECT_IDS);
}
+ bool evict_if_full() const {
+ return GetField<uint8_t>(VT_EVICT_IF_FULL, 0) != 0;
+ }
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> *data() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> *>(VT_DATA);
}
@@ -1071,6 +1105,7 @@ struct PlasmaCreateAndSealBatchRequest FLATBUFFERS_FINAL_CLASS : private flatbuf
VerifyOffset(verifier, VT_OBJECT_IDS) &&
verifier.VerifyVector(object_ids()) &&
verifier.VerifyVectorOfStrings(object_ids()) &&
+ VerifyField<uint8_t>(verifier, VT_EVICT_IF_FULL) &&
VerifyOffset(verifier, VT_DATA) &&
verifier.VerifyVector(data()) &&
verifier.VerifyVectorOfStrings(data()) &&
@@ -1093,6 +1128,9 @@ struct PlasmaCreateAndSealBatchRequestBuilder {
void add_object_ids(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> object_ids) {
fbb_.AddOffset(PlasmaCreateAndSealBatchRequest::VT_OBJECT_IDS, object_ids);
}
+ void add_evict_if_full(bool evict_if_full) {
+ fbb_.AddElement<uint8_t>(PlasmaCreateAndSealBatchRequest::VT_EVICT_IF_FULL, static_cast<uint8_t>(evict_if_full), 0);
+ }
void add_data(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> data) {
fbb_.AddOffset(PlasmaCreateAndSealBatchRequest::VT_DATA, data);
}
@@ -1117,6 +1155,7 @@ struct PlasmaCreateAndSealBatchRequestBuilder {
inline flatbuffers::Offset<PlasmaCreateAndSealBatchRequest> CreatePlasmaCreateAndSealBatchRequest(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> object_ids = 0,
+ bool evict_if_full = false,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> data = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> metadata = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> digest = 0) {
@@ -1125,12 +1164,14 @@ inline flatbuffers::Offset<PlasmaCreateAndSealBatchRequest> CreatePlasmaCreateAn
builder_.add_metadata(metadata);
builder_.add_data(data);
builder_.add_object_ids(object_ids);
+ builder_.add_evict_if_full(evict_if_full);
return builder_.Finish();
}
inline flatbuffers::Offset<PlasmaCreateAndSealBatchRequest> CreatePlasmaCreateAndSealBatchRequestDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<flatbuffers::Offset<flatbuffers::String>> *object_ids = nullptr,
+ bool evict_if_full = false,
const std::vector<flatbuffers::Offset<flatbuffers::String>> *data = nullptr,
const std::vector<flatbuffers::Offset<flatbuffers::String>> *metadata = nullptr,
const std::vector<flatbuffers::Offset<flatbuffers::String>> *digest = nullptr) {
@@ -1141,6 +1182,7 @@ inline flatbuffers::Offset<PlasmaCreateAndSealBatchRequest> CreatePlasmaCreateAn
return plasma::flatbuf::CreatePlasmaCreateAndSealBatchRequest(
_fbb,
object_ids__,
+ evict_if_full,
data__,
metadata__,
digest__);
@@ -2948,6 +2990,7 @@ inline void PlasmaCreateRequest::UnPackTo(PlasmaCreateRequestT *_o, const flatbu
(void)_o;
(void)_resolver;
{ auto _e = object_id(); if (_e) _o->object_id = _e->str(); };
+ { auto _e = evict_if_full(); _o->evict_if_full = _e; };
{ auto _e = data_size(); _o->data_size = _e; };
{ auto _e = metadata_size(); _o->metadata_size = _e; };
{ auto _e = device_num(); _o->device_num = _e; };
@@ -2962,12 +3005,14 @@ inline flatbuffers::Offset<PlasmaCreateRequest> CreatePlasmaCreateRequest(flatbu
(void)_o;
struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const PlasmaCreateRequestT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va;
auto _object_id = _o->object_id.empty() ? 0 : _fbb.CreateString(_o->object_id);
+ auto _evict_if_full = _o->evict_if_full;
auto _data_size = _o->data_size;
auto _metadata_size = _o->metadata_size;
auto _device_num = _o->device_num;
return plasma::flatbuf::CreatePlasmaCreateRequest(
_fbb,
_object_id,
+ _evict_if_full,
_data_size,
_metadata_size,
_device_num);
@@ -3050,6 +3095,7 @@ inline void PlasmaCreateAndSealRequest::UnPackTo(PlasmaCreateAndSealRequestT *_o
(void)_o;
(void)_resolver;
{ auto _e = object_id(); if (_e) _o->object_id = _e->str(); };
+ { auto _e = evict_if_full(); _o->evict_if_full = _e; };
{ auto _e = data(); if (_e) _o->data = _e->str(); };
{ auto _e = metadata(); if (_e) _o->metadata = _e->str(); };
{ auto _e = digest(); if (_e) _o->digest = _e->str(); };
@@ -3064,12 +3110,14 @@ inline flatbuffers::Offset<PlasmaCreateAndSealRequest> CreatePlasmaCreateAndSeal
(void)_o;
struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const PlasmaCreateAndSealRequestT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va;
auto _object_id = _o->object_id.empty() ? 0 : _fbb.CreateString(_o->object_id);
+ auto _evict_if_full = _o->evict_if_full;
auto _data = _o->data.empty() ? 0 : _fbb.CreateString(_o->data);
auto _metadata = _o->metadata.empty() ? 0 : _fbb.CreateString(_o->metadata);
auto _digest = _o->digest.empty() ? 0 : _fbb.CreateString(_o->digest);
return plasma::flatbuf::CreatePlasmaCreateAndSealRequest(
_fbb,
_object_id,
+ _evict_if_full,
_data,
_metadata,
_digest);
@@ -3111,6 +3159,7 @@ inline void PlasmaCreateAndSealBatchRequest::UnPackTo(PlasmaCreateAndSealBatchRe
(void)_o;
(void)_resolver;
{ auto _e = object_ids(); if (_e) { _o->object_ids.resize(_e->size()); for (flatbuffers::uoffset_t _i = 0; _i < _e->size(); _i++) { _o->object_ids[_i] = _e->Get(_i)->str(); } } };
+ { auto _e = evict_if_full(); _o->evict_if_full = _e; };
{ auto _e = data(); if (_e) { _o->data.resize(_e->size()); for (flatbuffers::uoffset_t _i = 0; _i < _e->size(); _i++) { _o->data[_i] = _e->Get(_i)->str(); } } };
{ auto _e = metadata(); if (_e) { _o->metadata.resize(_e->size()); for (flatbuffers::uoffset_t _i = 0; _i < _e->size(); _i++) { _o->metadata[_i] = _e->Get(_i)->str(); } } };
{ auto _e = digest(); if (_e) { _o->digest.resize(_e->size()); for (flatbuffers::uoffset_t _i = 0; _i < _e->size(); _i++) { _o->digest[_i] = _e->Get(_i)->str(); } } };
@@ -3125,12 +3174,14 @@ inline flatbuffers::Offset<PlasmaCreateAndSealBatchRequest> CreatePlasmaCreateAn
(void)_o;
struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const PlasmaCreateAndSealBatchRequestT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va;
auto _object_ids = _o->object_ids.size() ? _fbb.CreateVectorOfStrings(_o->object_ids) : 0;
+ auto _evict_if_full = _o->evict_if_full;
auto _data = _o->data.size() ? _fbb.CreateVectorOfStrings(_o->data) : 0;
auto _metadata = _o->metadata.size() ? _fbb.CreateVectorOfStrings(_o->metadata) : 0;
auto _digest = _o->digest.size() ? _fbb.CreateVectorOfStrings(_o->digest) : 0;
return plasma::flatbuf::CreatePlasmaCreateAndSealBatchRequest(
_fbb,
_object_ids,
+ _evict_if_full,
_data,
_metadata,
_digest);
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index b4468ca..17fef52 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -183,19 +183,22 @@ Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_st
// Create messages.
-Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
- int64_t metadata_size, int device_num) {
+Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full,
+ int64_t data_size, int64_t metadata_size, int device_num) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()),
- data_size, metadata_size, device_num);
+ auto message =
+ fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()),
+ evict_if_full, data_size, metadata_size, device_num);
return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message);
}
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
- int64_t* data_size, int64_t* metadata_size, int* device_num) {
+ bool* evict_if_full, int64_t* data_size, int64_t* metadata_size,
+ int* device_num) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
+ *evict_if_full = message->evict_if_full();
*data_size = message->data_size();
*metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str());
@@ -262,25 +265,26 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
return PlasmaErrorStatus(message->error());
}
-Status SendCreateAndSealRequest(int sock, const ObjectID& object_id,
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full,
const std::string& data, const std::string& metadata,
unsigned char* digest) {
flatbuffers::FlatBufferBuilder fbb;
auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
auto message = fb::CreatePlasmaCreateAndSealRequest(
- fbb, fbb.CreateString(object_id.binary()), fbb.CreateString(data),
+ fbb, fbb.CreateString(object_id.binary()), evict_if_full, fbb.CreateString(data),
fbb.CreateString(metadata), digest_string);
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message);
}
Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
- std::string* object_data, std::string* metadata,
- std::string* digest) {
+ bool* evict_if_full, std::string* object_data,
+ std::string* metadata, std::string* digest) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
+ *evict_if_full = message->evict_if_full();
*object_data = message->data()->str();
*metadata = message->metadata()->str();
ARROW_CHECK(message->digest()->size() == kDigestSize);
@@ -289,13 +293,14 @@ Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
}
Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids,
+ bool evict_if_full,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata,
const std::vector<std::string>& digests) {
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaCreateAndSealBatchRequest(
- fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()),
+ fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()), evict_if_full,
ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata),
ToFlatbuffer(&fbb, digests));
@@ -304,6 +309,7 @@ Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& obje
Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids,
+ bool* evict_if_full,
std::vector<std::string>* object_data,
std::vector<std::string>* metadata,
std::vector<std::string>* digests) {
@@ -311,6 +317,7 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
+ *evict_if_full = message->evict_if_full();
ConvertToVector(message->object_ids(), object_ids,
[](const flatbuffers::String& element) {
return ObjectID::from_binary(element.str());
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 6b93b24..37d03a0 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -77,11 +77,12 @@ Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_st
/* Plasma Create message functions. */
-Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
- int64_t metadata_size, int device_num);
+Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full,
+ int64_t data_size, int64_t metadata_size, int device_num);
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
- int64_t* data_size, int64_t* metadata_size, int* device_num);
+ bool* evict_if_full, int64_t* data_size, int64_t* metadata_size,
+ int* device_num);
Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
PlasmaError error, int64_t mmap_size);
@@ -89,21 +90,23 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object, int* store_fd, int64_t* mmap_size);
-Status SendCreateAndSealRequest(int sock, const ObjectID& object_id,
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full,
const std::string& data, const std::string& metadata,
unsigned char* digest);
Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
- std::string* object_data, std::string* metadata,
- std::string* digest);
+ bool* evict_if_full, std::string* object_data,
+ std::string* metadata, std::string* digest);
Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids,
+ bool evict_if_full,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata,
const std::vector<std::string>& digests);
Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
std::vector<ObjectID>* object_id,
+ bool* evict_if_full,
std::vector<std::string>* object_data,
std::vector<std::string>* metadata,
std::vector<std::string>* digests);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 81e3c17..977ba9f 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -152,16 +152,19 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt
}
// Allocate memory
-uint8_t* PlasmaStore::AllocateMemory(size_t size, int* fd, int64_t* map_size,
- ptrdiff_t* offset, Client* client, bool is_create) {
+uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd,
+ int64_t* map_size, ptrdiff_t* offset, Client* client,
+ bool is_create) {
// First free up space from the client's LRU queue if quota enforcement is on.
- std::vector<ObjectID> client_objects_to_evict;
- bool quota_ok = eviction_policy_.EnforcePerClientQuota(client, size, is_create,
- &client_objects_to_evict);
- if (!quota_ok) {
- return nullptr;
+ if (evict_if_full) {
+ std::vector<ObjectID> client_objects_to_evict;
+ bool quota_ok = eviction_policy_.EnforcePerClientQuota(client, size, is_create,
+ &client_objects_to_evict);
+ if (!quota_ok) {
+ return nullptr;
+ }
+ EvictObjects(client_objects_to_evict);
}
- EvictObjects(client_objects_to_evict);
// Try to evict objects until there is enough space.
uint8_t* pointer = nullptr;
@@ -174,7 +177,10 @@ uint8_t* PlasmaStore::AllocateMemory(size_t size, int* fd, int64_t* map_size,
// it is not guaranteed that the corresponding pointer in the client will be
// 64-byte aligned, but in practice it often will be.
pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
- if (pointer) {
+ if (pointer || !evict_if_full) {
+ // If we manage to allocate the memory, return the pointer. If we cannot
+ // allocate the space, but we are also not allowed to evict anything to
+ // make more space, return an error to the client.
break;
}
// Tell the eviction policy how much space we need to create this object.
@@ -184,11 +190,14 @@ uint8_t* PlasmaStore::AllocateMemory(size_t size, int* fd, int64_t* map_size,
// Return an error to the client if not enough space could be freed to
// create the object.
if (!success) {
- return nullptr;
+ break;
}
}
- GetMallocMapinfo(pointer, fd, map_size, offset);
- ARROW_CHECK(*fd != -1);
+
+ if (pointer != nullptr) {
+ GetMallocMapinfo(pointer, fd, map_size, offset);
+ ARROW_CHECK(*fd != -1);
+ }
return pointer;
}
@@ -212,9 +221,10 @@ Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointe
#endif
// Create a new object buffer in the hash table.
-PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_size,
- int64_t metadata_size, int device_num,
- Client* client, PlasmaObject* result) {
+PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_full,
+ int64_t data_size, int64_t metadata_size,
+ int device_num, Client* client,
+ PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
auto entry = GetObjectTableEntry(&store_info_, object_id);
@@ -231,7 +241,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
auto total_size = data_size + metadata_size;
if (device_num == 0) {
- pointer = AllocateMemory(total_size, &fd, &map_size, &offset, client, true);
+ pointer =
+ AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true);
if (!pointer) {
ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex()
<< ", data_size=" << data_size
@@ -454,8 +465,9 @@ void PlasmaStore::ProcessGetRequest(Client* client,
// Make sure the object pointer is not already allocated
ARROW_CHECK(!entry->pointer);
- entry->pointer = AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd,
- &entry->map_size, &entry->offset, client, false);
+ entry->pointer =
+ AllocateMemory(entry->data_size + entry->metadata_size, /*evict=*/true,
+ &entry->fd, &entry->map_size, &entry->offset, client, false);
if (entry->pointer) {
entry->state = ObjectState::PLASMA_CREATED;
entry->create_time = std::time(nullptr);
@@ -925,13 +937,14 @@ Status PlasmaStore::ProcessMessage(Client* client) {
// Process the different types of requests.
switch (type) {
case fb::MessageType::PlasmaCreateRequest: {
+ bool evict_if_full;
int64_t data_size;
int64_t metadata_size;
int device_num;
- RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &data_size,
- &metadata_size, &device_num));
- PlasmaError error_code =
- CreateObject(object_id, data_size, metadata_size, device_num, client, &object);
+ RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &evict_if_full,
+ &data_size, &metadata_size, &device_num));
+ PlasmaError error_code = CreateObject(object_id, evict_if_full, data_size,
+ metadata_size, device_num, client, &object);
int64_t mmap_size = 0;
if (error_code == PlasmaError::OK && device_num == 0) {
mmap_size = GetMmapSize(object.store_fd);
@@ -948,17 +961,18 @@ Status PlasmaStore::ProcessMessage(Client* client) {
}
} break;
case fb::MessageType::PlasmaCreateAndSealRequest: {
+ bool evict_if_full;
std::string data;
std::string metadata;
std::string digest;
digest.reserve(kDigestSize);
- RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id, &data,
- &metadata, &digest));
+ RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id,
+ &evict_if_full, &data, &metadata, &digest));
// CreateAndSeal currently only supports device_num = 0, which corresponds
// to the host.
int device_num = 0;
- PlasmaError error_code = CreateObject(object_id, data.size(), metadata.size(),
- device_num, client, &object);
+ PlasmaError error_code = CreateObject(object_id, evict_if_full, data.size(),
+ metadata.size(), device_num, client, &object);
// If the object was successfully created, fill out the object data and seal it.
if (error_code == PlasmaError::OK) {
@@ -979,13 +993,14 @@ Status PlasmaStore::ProcessMessage(Client* client) {
HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), client->fd);
} break;
case fb::MessageType::PlasmaCreateAndSealBatchRequest: {
+ bool evict_if_full;
std::vector<ObjectID> object_ids;
std::vector<std::string> data;
std::vector<std::string> metadata;
std::vector<std::string> digests;
- RETURN_NOT_OK(ReadCreateAndSealBatchRequest(input, input_size, &object_ids, &data,
- &metadata, &digests));
+ RETURN_NOT_OK(ReadCreateAndSealBatchRequest(
+ input, input_size, &object_ids, &evict_if_full, &data, &metadata, &digests));
// CreateAndSeal currently only supports device_num = 0, which corresponds
// to the host.
@@ -993,8 +1008,8 @@ Status PlasmaStore::ProcessMessage(Client* client) {
size_t i = 0;
PlasmaError error_code = PlasmaError::OK;
for (i = 0; i < object_ids.size(); i++) {
- error_code = CreateObject(object_ids[i], data[i].size(), metadata[i].size(),
- device_num, client, &object);
+ error_code = CreateObject(object_ids[i], evict_if_full, data[i].size(),
+ metadata[i].size(), device_num, client, &object);
if (error_code != PlasmaError::OK) {
break;
}
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 608349b..7c3b5fb 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -72,6 +72,10 @@ class PlasmaStore {
/// the store when it is done with the object.
///
/// \param object_id Object ID of the object to be created.
+ /// \param evict_if_full If this is true, then when the object store is full,
+ /// try to evict objects that are not currently referenced before
+ /// creating the object. Else, do not evict any objects and
+ /// immediately return an PlasmaError::OutOfMemory.
/// \param data_size Size in bytes of the object to be created.
/// \param metadata_size Size in bytes of the object metadata.
/// \param device_num The number of the device where the object is being
@@ -89,9 +93,9 @@ class PlasmaStore {
/// - PlasmaError::OutOfMemory, if the store is out of memory and
/// cannot create the object. In this case, the client should not call
/// plasma_release.
- PlasmaError CreateObject(const ObjectID& object_id, int64_t data_size,
- int64_t metadata_size, int device_num, Client* client,
- PlasmaObject* result);
+ PlasmaError CreateObject(const ObjectID& object_id, bool evict_if_full,
+ int64_t data_size, int64_t metadata_size, int device_num,
+ Client* client, PlasmaObject* result);
/// Abort a created but unsealed object. If the client is not the
/// creator, then the abort will fail.
@@ -200,8 +204,8 @@ class PlasmaStore {
void EraseFromObjectTable(const ObjectID& object_id);
- uint8_t* AllocateMemory(size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset,
- Client* client, bool is_create);
+ uint8_t* AllocateMemory(size_t size, bool evict_if_full, int* fd, int64_t* map_size,
+ ptrdiff_t* offset, Client* client, bool is_create);
#ifdef PLASMA_CUDA
Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer,
std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle);
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 96c3760..a9eea7b 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -97,15 +97,18 @@ TEST_F(TestPlasmaSerialization, CreateRequest) {
int64_t data_size1 = 42;
int64_t metadata_size1 = 11;
int device_num1 = 0;
- ASSERT_OK(SendCreateRequest(fd, object_id1, data_size1, metadata_size1, device_num1));
+ ASSERT_OK(SendCreateRequest(fd, object_id1, /*evict_if_full=*/true, data_size1,
+ metadata_size1, device_num1));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaCreateRequest);
ObjectID object_id2;
+ bool evict_if_full;
int64_t data_size2;
int64_t metadata_size2;
int device_num2;
- ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2,
- &metadata_size2, &device_num2));
+ ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &evict_if_full,
+ &data_size2, &metadata_size2, &device_num2));
+ ASSERT_TRUE(evict_if_full);
ASSERT_EQ(data_size1, data_size2);
ASSERT_EQ(metadata_size1, metadata_size2);
ASSERT_EQ(object_id1, object_id2);
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 8e49aaf..e5e9bfe 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -101,12 +101,12 @@ cdef extern from "plasma/client.h" nogil:
const c_string& manager_socket_name,
int release_delay, int num_retries)
- CStatus Create(const CUniqueID& object_id, int64_t data_size,
- const uint8_t* metadata, int64_t metadata_size,
- const shared_ptr[CBuffer]* data)
+ CStatus Create(const CUniqueID& object_id,
+ int64_t data_size, const uint8_t* metadata, int64_t
+ metadata_size, const shared_ptr[CBuffer]* data)
- CStatus CreateAndSeal(const CUniqueID& object_id, const c_string& data,
- const c_string& metadata)
+ CStatus CreateAndSeal(const CUniqueID& object_id,
+ const c_string& data, const c_string& metadata)
CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* object_buffers)