You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/01/20 21:41:32 UTC
[arrow] branch master updated: ARROW-2000: [Plasma] Deduplicate
file descriptors when replying to GetRequest.
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d135974 ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest.
d135974 is described below
commit d135974a0d3dd9a9fbbb10da4c5dbc65f9324234
Author: Robert Nishihara <ro...@gmail.com>
AuthorDate: Sat Jan 20 13:41:23 2018 -0800
ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest.
Author: Robert Nishihara <ro...@gmail.com>
Closes #1479 from robertnishihara/deduplicatefiledescriptors and squashes the following commits:
9be9643 [Robert Nishihara] Fix bug.
8a827cf [Robert Nishihara] Remove mmap_size from PlasmaObject.
ab30d7d [Robert Nishihara] Fix tests.
2916e87 [Robert Nishihara] Remove mmap_size from PlasmaObjectSpec, and file_descriptor -> fd.
7f5c618 [Robert Nishihara] Deduplicate file descriptors when store replies to Get.
ab12d63 [Robert Nishihara] Make Create return a MutableBuffer.
---
cpp/src/plasma/client.cc | 45 ++++++++++----------
cpp/src/plasma/client.h | 5 ++-
cpp/src/plasma/format/plasma.fbs | 20 +++++++--
cpp/src/plasma/malloc.cc | 10 +++++
cpp/src/plasma/malloc.h | 6 +++
cpp/src/plasma/plasma.h | 12 +-----
cpp/src/plasma/protocol.cc | 49 +++++++++++++---------
cpp/src/plasma/protocol.h | 11 +++--
cpp/src/plasma/store.cc | 67 ++++++++++++++++++------------
cpp/src/plasma/test/client_tests.cc | 14 +++----
cpp/src/plasma/test/serialization_tests.cc | 26 +++++++++---
python/pyarrow/plasma.pyx | 4 +-
12 files changed, 165 insertions(+), 104 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index d74c0f4..a683da0 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -54,8 +54,6 @@
namespace plasma {
-using arrow::MutableBuffer;
-
// Number of threads used for memcopy and hash computations.
constexpr int64_t kThreadPoolSize = 8;
constexpr int64_t kBytesInMB = 1 << 20;
@@ -130,7 +128,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec
// Increment the count of the number of objects in the memory-mapped file
// that are being used. The corresponding decrement should happen in
// PlasmaClient::Release.
- auto entry = mmap_table_.find(object->handle.store_fd);
+ auto entry = mmap_table_.find(object->store_fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 0);
// Update the in_use_object_bytes_.
@@ -149,7 +147,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec
Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
uint8_t* metadata, int64_t metadata_size,
- std::shared_ptr<Buffer>* data) {
+ std::shared_ptr<MutableBuffer>* data) {
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size));
@@ -157,7 +155,10 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
ObjectID id;
PlasmaObject object;
- RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
+ int store_fd;
+ int64_t mmap_size;
+ RETURN_NOT_OK(
+ ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size));
// If the CreateReply included an error, then the store will not send a file
// descriptor.
int fd = recv_fd(store_conn_);
@@ -167,9 +168,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
*data = std::make_shared<MutableBuffer>(
- lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
- object.data_offset,
- data_size);
+ lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
// If plasma_create is being called from a transfer, then we will not copy the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
@@ -209,7 +208,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
ARROW_CHECK(object_entry->second->is_sealed)
<< "Plasma client called get on an unsealed object that it created";
PlasmaObject* object = &object_entry->second->object;
- uint8_t* data = lookup_mmapped_file(object->handle.store_fd);
+ uint8_t* data = lookup_mmapped_file(object->store_fd);
object_buffers[i].data =
std::make_shared<Buffer>(data + object->data_offset, object->data_size);
object_buffers[i].metadata = std::make_shared<Buffer>(
@@ -236,8 +235,19 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
std::vector<ObjectID> received_object_ids(num_objects);
std::vector<PlasmaObject> object_data(num_objects);
PlasmaObject* object;
+ std::vector<int> store_fds;
+ std::vector<int64_t> mmap_sizes;
RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
- object_data.data(), num_objects));
+ object_data.data(), num_objects, store_fds, mmap_sizes));
+
+ // We mmap all of the file descriptors here so that we can avoid look them up
+ // in the subsequent loop based on just the store file descriptor and without
+ // having to know the relevant file descriptor received from recv_fd.
+ for (size_t i = 0; i < store_fds.size(); i++) {
+ int fd = recv_fd(store_conn_);
+ ARROW_CHECK(fd >= 0);
+ lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
+ }
for (int i = 0; i < num_objects; ++i) {
DCHECK(received_object_ids[i] == object_ids[i]);
@@ -246,12 +256,6 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
// If the object was already in use by the client, then the store should
// have returned it.
DCHECK_NE(object->data_size, -1);
- // We won't use this file descriptor, but the store sent us one, so we
- // need to receive it and then close it right away so we don't leak file
- // descriptors.
- int fd = recv_fd(store_conn_);
- close(fd);
- ARROW_CHECK(fd >= 0);
// We've already filled out the information for this object, so we can
// just continue.
continue;
@@ -259,12 +263,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
// If we are here, the object was not currently in use, so we need to
// process the reply from the object store.
if (object->data_size != -1) {
- // The object was retrieved. The user will be responsible for releasing
- // this object.
- int fd = recv_fd(store_conn_);
- uint8_t* data =
- lookup_or_mmap(fd, object->handle.store_fd, object->handle.mmap_size);
- ARROW_CHECK(fd >= 0);
+ uint8_t* data = lookup_mmapped_file(object->store_fd);
// Finish filling out the return values.
object_buffers[i].data =
std::make_shared<Buffer>(data + object->data_offset, object->data_size);
@@ -296,7 +295,7 @@ Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
// Decrement the count of the number of objects in this memory-mapped file
// that the client is using. The corresponding increment should have
// happened in plasma_get.
- int fd = object_entry->second->object.handle.store_fd;
+ int fd = object_entry->second->object.store_fd;
auto entry = mmap_table_.find(fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 1);
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 35182f8..a1e10a9 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -31,8 +31,9 @@
#include "arrow/util/visibility.h"
#include "plasma/common.h"
-using arrow::Status;
using arrow::Buffer;
+using arrow::MutableBuffer;
+using arrow::Status;
namespace plasma {
@@ -115,7 +116,7 @@ class ARROW_EXPORT PlasmaClient {
/// will be written here.
/// \return The return status.
Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
- int64_t metadata_size, std::shared_ptr<Buffer>* data);
+ int64_t metadata_size, std::shared_ptr<MutableBuffer>* data);
/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
/// timeout
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index ea6dc8b..33803f7 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -89,8 +89,6 @@ struct PlasmaObjectSpec {
// Index of the memory segment (= memory mapped file) that
// this object is allocated in.
segment_index: int;
- // Size in bytes of this segment (needed to call mmap).
- mmap_size: ulong;
// The offset in bytes in the memory mapped file of the data.
data_offset: ulong;
// The size in bytes of the data.
@@ -117,6 +115,12 @@ table PlasmaCreateReply {
plasma_object: PlasmaObjectSpec;
// Error that occurred for this call.
error: PlasmaError;
+ // The file descriptor in the store that corresponds to the file descriptor
+ // being sent to the client right after this message.
+ store_fd: int;
+ // The size in bytes of the segment for the store file descriptor (needed to
+ // call mmap).
+ mmap_size: long;
}
table PlasmaAbortRequest {
@@ -156,9 +160,17 @@ table PlasmaGetReply {
// objects if not all requested objects are stored and sealed
// in the local Plasma store.
object_ids: [string];
- // Plasma object information, in the same order as their IDs.
+ // Plasma object information, in the same order as their IDs. The number of
+ // elements in both object_ids and plasma_objects arrays must agree.
plasma_objects: [PlasmaObjectSpec];
- // The number of elements in both object_ids and plasma_objects arrays must agree.
+ // A list of the file descriptors in the store that correspond to the file
+ // descriptors being sent to the client. The length of this list is the number
+ // of file descriptors that the store will send to the client after this
+ // message.
+ store_fds: [int];
+ // Size in bytes of the segment for each store file descriptor (needed to call
+ // mmap). This list must have the same length as store_fds.
+ mmap_sizes: [long];
}
table PlasmaReleaseRequest {
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
index 52d3620..3c5d107 100644
--- a/cpp/src/plasma/malloc.cc
+++ b/cpp/src/plasma/malloc.cc
@@ -197,4 +197,14 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse
*offset = 0;
}
+int64_t get_mmap_size(int fd) {
+ for (const auto& entry : mmap_records) {
+ if (entry.second.fd == fd) {
+ return entry.second.size;
+ }
+ }
+ ARROW_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd;
+ return -1; // This code is never reached.
+}
+
void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); }
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
index 0df720d..cb8c600 100644
--- a/cpp/src/plasma/malloc.h
+++ b/cpp/src/plasma/malloc.h
@@ -23,6 +23,12 @@
void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
+/// Get the mmap size corresponding to a specific file descriptor.
+///
+/// @param fd The file descriptor to look up.
+/// @return The size of the corresponding memory-mapped file.
+int64_t get_mmap_size(int fd);
+
void set_malloc_granularity(int value);
#endif // MALLOC_H
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 603ff8a..2d07c91 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -64,20 +64,12 @@ struct Client;
/// Mapping from object IDs to type and status of the request.
typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
-/// Handle to access memory mapped file and map it into client address space.
-struct object_handle {
+// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
+struct PlasmaObject {
/// The file descriptor of the memory mapped file in the store. It is used as
/// a unique identifier of the file in the client to look up the corresponding
/// file descriptor on the client's side.
int store_fd;
- /// The size in bytes of the memory mapped file.
- int64_t mmap_size;
-};
-
-// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
-struct PlasmaObject {
- /// Handle for memory mapped file the object is stored in.
- object_handle handle;
/// The offset in bytes in the memory mapped file of the data.
ptrdiff_t data_offset;
/// The offset in bytes in the memory mapped file of the metadata.
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index c0ebb88..6c0bc0c 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -73,30 +73,32 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
return Status::OK();
}
-Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
- int error_code) {
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error_code,
+ int64_t mmap_size) {
flatbuffers::FlatBufferBuilder fbb;
- PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size,
- object->data_offset, object->data_size,
+ PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size,
object->metadata_offset, object->metadata_size);
- auto message =
- CreatePlasmaCreateReply(fbb, fbb.CreateString(object_id.binary()), &plasma_object,
- static_cast<PlasmaError>(error_code));
+ auto message = CreatePlasmaCreateReply(
+ fbb, fbb.CreateString(object_id.binary()), &plasma_object,
+ static_cast<PlasmaError>(error_code), object->store_fd, mmap_size);
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
}
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
- PlasmaObject* object) {
+ PlasmaObject* object, int* store_fd, int64_t* mmap_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
- object->handle.store_fd = message->plasma_object()->segment_index();
- object->handle.mmap_size = message->plasma_object()->mmap_size();
+ object->store_fd = message->plasma_object()->segment_index();
object->data_offset = message->plasma_object()->data_offset();
object->data_size = message->plasma_object()->data_size();
object->metadata_offset = message->plasma_object()->metadata_offset();
object->metadata_size = message->plasma_object()->metadata_size();
+
+ *store_fd = message->store_fd();
+ *mmap_size = message->mmap_size();
+
return plasma_error_status(message->error());
}
@@ -389,24 +391,29 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_
Status SendGetReply(
int sock, ObjectID object_ids[],
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
- int64_t num_objects) {
+ int64_t num_objects, const std::vector<int>& store_fds,
+ const std::vector<int64_t>& mmap_sizes) {
flatbuffers::FlatBufferBuilder fbb;
std::vector<PlasmaObjectSpec> objects;
- for (int i = 0; i < num_objects; ++i) {
+ ARROW_CHECK(store_fds.size() == mmap_sizes.size());
+
+ for (int64_t i = 0; i < num_objects; ++i) {
const PlasmaObject& object = plasma_objects[object_ids[i]];
- objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size,
- object.data_offset, object.data_size,
- object.metadata_offset, object.metadata_size));
+ objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset,
+ object.data_size, object.metadata_offset,
+ object.metadata_size));
}
auto message =
CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
- fbb.CreateVectorOfStructs(objects.data(), num_objects));
+ fbb.CreateVectorOfStructs(objects.data(), num_objects),
+ fbb.CreateVector(store_fds), fbb.CreateVector(mmap_sizes));
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
}
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
- PlasmaObject plasma_objects[], int64_t num_objects) {
+ PlasmaObject plasma_objects[], int64_t num_objects,
+ std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
@@ -415,13 +422,17 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
}
for (uoffset_t i = 0; i < num_objects; ++i) {
const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
- plasma_objects[i].handle.store_fd = object->segment_index();
- plasma_objects[i].handle.mmap_size = object->mmap_size();
+ plasma_objects[i].store_fd = object->segment_index();
plasma_objects[i].data_offset = object->data_offset();
plasma_objects[i].data_size = object->data_size();
plasma_objects[i].metadata_offset = object->metadata_offset();
plasma_objects[i].metadata_size = object->metadata_size();
}
+ ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
+ for (uoffset_t i = 0; i < message->store_fds()->size(); i++) {
+ store_fds.push_back(message->store_fds()->Get(i));
+ mmap_sizes.push_back(message->mmap_sizes()->Get(i));
+ }
return Status::OK();
}
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index e8c334f..44263a6 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -46,10 +46,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size);
-Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error);
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error,
+ int64_t mmap_size);
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
- PlasmaObject* object);
+ PlasmaObject* object, int* store_fd, int64_t* mmap_size);
Status SendAbortRequest(int sock, ObjectID object_id);
@@ -81,10 +82,12 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_
Status SendGetReply(
int sock, ObjectID object_ids[],
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
- int64_t num_objects);
+ int64_t num_objects, const std::vector<int>& store_fds,
+ const std::vector<int64_t>& mmap_sizes);
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
- PlasmaObject plasma_objects[], int64_t num_objects);
+ PlasmaObject plasma_objects[], int64_t num_objects,
+ std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes);
/* Plasma Release message functions. */
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index dde7f9c..80dd525 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -192,8 +192,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
entry->state = PLASMA_CREATED;
store_info_.objects[object_id] = std::move(entry);
- result->handle.store_fd = fd;
- result->handle.mmap_size = map_size;
+ result->store_fd = fd;
result->data_offset = offset;
result->metadata_offset = offset + data_size;
result->data_size = data_size;
@@ -211,8 +210,7 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
DCHECK(object != NULL);
DCHECK(entry != NULL);
DCHECK(entry->state == PLASMA_SEALED);
- object->handle.store_fd = entry->fd;
- object->handle.mmap_size = entry->map_size;
+ object->store_fd = entry->fd;
object->data_offset = entry->offset;
object->metadata_offset = entry->offset + entry->info.data_size;
object->data_size = entry->info.data_size;
@@ -220,34 +218,44 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
}
void PlasmaStore::return_from_get(GetRequest* get_req) {
+ // Figure out how many file descriptors we need to send.
+ std::unordered_set<int> fds_to_send;
+ std::vector<int> store_fds;
+ std::vector<int64_t> mmap_sizes;
+ for (const auto& object_id : get_req->object_ids) {
+ PlasmaObject& object = get_req->objects[object_id];
+ int fd = object.store_fd;
+ if (object.data_size != -1 && fds_to_send.count(fd) == 0) {
+ fds_to_send.insert(fd);
+ store_fds.push_back(fd);
+ mmap_sizes.push_back(get_mmap_size(fd));
+ }
+ }
+
// Send the get reply to the client.
Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
- get_req->object_ids.size());
+ get_req->object_ids.size(), store_fds, mmap_sizes);
warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd);
// If we successfully sent the get reply message to the client, then also send
// the file descriptors.
if (s.ok()) {
// Send all of the file descriptors for the present objects.
- for (const auto& object_id : get_req->object_ids) {
- PlasmaObject& object = get_req->objects[object_id];
- // We use the data size to indicate whether the object is present or not.
- if (object.data_size != -1) {
- int error_code = send_fd(get_req->client->fd, object.handle.store_fd);
- // If we failed to send the file descriptor, loop until we have sent it
- // successfully. TODO(rkn): This is problematic for two reasons. First
- // of all, sending the file descriptor should just succeed without any
- // errors, but sometimes I see a "Message too long" error number.
- // Second, looping like this allows a client to potentially block the
- // plasma store event loop which should never happen.
- while (error_code < 0) {
- if (errno == EMSGSIZE) {
- ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
- error_code = send_fd(get_req->client->fd, object.handle.store_fd);
- continue;
- }
- warn_if_sigpipe(error_code, get_req->client->fd);
- break;
+ for (int store_fd : store_fds) {
+ int error_code = send_fd(get_req->client->fd, store_fd);
+ // If we failed to send the file descriptor, loop until we have sent it
+ // successfully. TODO(rkn): This is problematic for two reasons. First
+ // of all, sending the file descriptor should just succeed without any
+ // errors, but sometimes I see a "Message too long" error number.
+ // Second, looping like this allows a client to potentially block the
+ // plasma store event loop which should never happen.
+ while (error_code < 0) {
+ if (errno == EMSGSIZE) {
+ ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
+ error_code = send_fd(get_req->client->fd, store_fd);
+ continue;
}
+ warn_if_sigpipe(error_code, get_req->client->fd);
+ break;
}
}
}
@@ -640,10 +648,15 @@ Status PlasmaStore::process_message(Client* client) {
ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size));
int error_code =
create_object(object_id, data_size, metadata_size, client, &object);
- HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code),
- client->fd);
+ int64_t mmap_size = 0;
+ if (error_code == PlasmaError_OK) {
+ mmap_size = get_mmap_size(object.store_fd);
+ }
+ HANDLE_SIGPIPE(
+ SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
+ client->fd);
if (error_code == PlasmaError_OK) {
- warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
+ warn_if_sigpipe(send_fd(client->fd, object.store_fd), client->fd);
}
} break;
case MessageType_PlasmaAbortRequest: {
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index f19c2bf..63b5693 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -70,7 +70,7 @@ TEST_F(TestPlasmaStore, DeleteTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
+ std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));
@@ -96,7 +96,7 @@ TEST_F(TestPlasmaStore, ContainsTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
+ std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));
// Avoid race condition of Plasma Manager waiting for notification.
@@ -119,7 +119,7 @@ TEST_F(TestPlasmaStore, GetTest) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data_buffer;
+ std::shared_ptr<MutableBuffer> data_buffer;
uint8_t* data;
ARROW_CHECK_OK(
client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer));
@@ -145,7 +145,7 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
+ std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
data->mutable_data()[0] = 1;
ARROW_CHECK_OK(client_.Seal(object_id1));
@@ -172,7 +172,7 @@ TEST_F(TestPlasmaStore, AbortTest) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
+ std::shared_ptr<MutableBuffer> data;
uint8_t* data_ptr;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
data_ptr = data->mutable_data();
@@ -220,7 +220,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
+ std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client2_.Seal(object_id));
// Test that the first client can get the object.
@@ -260,7 +260,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- std::shared_ptr<Buffer> data;
+ std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
if (i % 3 == 0) {
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index b593b6a..656b2cc 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -63,8 +63,7 @@ PlasmaObject random_plasma_object(void) {
int random = rand_r(&seed);
PlasmaObject object;
memset(&object, 0, sizeof(object));
- object.handle.store_fd = random + 7;
- object.handle.mmap_size = random + 42;
+ object.store_fd = random + 7;
object.data_offset = random + 1;
object.metadata_offset = random + 2;
object.data_size = random + 3;
@@ -94,13 +93,19 @@ TEST(PlasmaSerialization, CreateReply) {
int fd = create_temp_file();
ObjectID object_id1 = ObjectID::from_random();
PlasmaObject object1 = random_plasma_object();
- ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0));
+ int64_t mmap_size1 = 1000000;
+ ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0, mmap_size1));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaCreateReply);
ObjectID object_id2;
PlasmaObject object2;
memset(&object2, 0, sizeof(object2));
- ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2));
+ int store_fd;
+ int64_t mmap_size2;
+ ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2,
+ &store_fd, &mmap_size2));
ASSERT_EQ(object_id1, object_id2);
+ ASSERT_EQ(object1.store_fd, store_fd);
+ ASSERT_EQ(mmap_size1, mmap_size2);
ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0);
close(fd);
}
@@ -158,13 +163,20 @@ TEST(PlasmaSerialization, GetReply) {
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> plasma_objects;
plasma_objects[object_ids[0]] = random_plasma_object();
plasma_objects[object_ids[1]] = random_plasma_object();
- ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2));
+ std::vector<int> store_fds = {1, 2, 3};
+ std::vector<int64_t> mmap_sizes = {100, 200, 300};
+ ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2, store_fds, mmap_sizes));
+
std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetReply);
ObjectID object_ids_return[2];
PlasmaObject plasma_objects_return[2];
+ std::vector<int> store_fds_return;
+ std::vector<int64_t> mmap_sizes_return;
memset(&plasma_objects_return, 0, sizeof(plasma_objects_return));
ARROW_CHECK_OK(ReadGetReply(data.data(), data.size(), object_ids_return,
- &plasma_objects_return[0], 2));
+ &plasma_objects_return[0], 2, store_fds_return,
+ mmap_sizes_return));
+
ASSERT_EQ(object_ids[0], object_ids_return[0]);
ASSERT_EQ(object_ids[1], object_ids_return[1]);
ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0],
@@ -173,6 +185,8 @@ TEST(PlasmaSerialization, GetReply) {
ASSERT_EQ(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1],
sizeof(PlasmaObject)),
0);
+ ASSERT_TRUE(store_fds == store_fds_return);
+ ASSERT_TRUE(mmap_sizes == mmap_sizes_return);
close(fd);
}
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 32f6d18..801d094 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -81,7 +81,7 @@ cdef extern from "plasma/client.h" nogil:
CStatus Create(const CUniqueID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
- const shared_ptr[CBuffer]* data)
+ const shared_ptr[CMutableBuffer]* data)
CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
int64_t timeout_ms, CObjectBuffer* object_buffers)
@@ -297,7 +297,7 @@ cdef class PlasmaClient:
not be created because the plasma store is unable to evict
enough objects to create room for it.
"""
- cdef shared_ptr[CBuffer] data
+ cdef shared_ptr[CMutableBuffer] data
with nogil:
check_status(self.client.get().Create(object_id.data, data_size,
<uint8_t*>(metadata.data()),
--
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].