You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/01/20 21:41:32 UTC

[arrow] branch master updated: ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest.

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d135974  ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest.
d135974 is described below

commit d135974a0d3dd9a9fbbb10da4c5dbc65f9324234
Author: Robert Nishihara <ro...@gmail.com>
AuthorDate: Sat Jan 20 13:41:23 2018 -0800

    ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest.
    
    Author: Robert Nishihara <ro...@gmail.com>
    
    Closes #1479 from robertnishihara/deduplicatefiledescriptors and squashes the following commits:
    
    9be9643 [Robert Nishihara] Fix bug.
    8a827cf [Robert Nishihara] Remove mmap_size from PlasmaObject.
    ab30d7d [Robert Nishihara] Fix tests.
    2916e87 [Robert Nishihara] Remove mmap_size from PlasmaObjectSpec, and file_descriptor -> fd.
    7f5c618 [Robert Nishihara] Deduplicate file descriptors when store replies to Get.
    ab12d63 [Robert Nishihara] Make Create return a MutableBuffer.
---
 cpp/src/plasma/client.cc                   | 45 ++++++++++----------
 cpp/src/plasma/client.h                    |  5 ++-
 cpp/src/plasma/format/plasma.fbs           | 20 +++++++--
 cpp/src/plasma/malloc.cc                   | 10 +++++
 cpp/src/plasma/malloc.h                    |  6 +++
 cpp/src/plasma/plasma.h                    | 12 +-----
 cpp/src/plasma/protocol.cc                 | 49 +++++++++++++---------
 cpp/src/plasma/protocol.h                  | 11 +++--
 cpp/src/plasma/store.cc                    | 67 ++++++++++++++++++------------
 cpp/src/plasma/test/client_tests.cc        | 14 +++----
 cpp/src/plasma/test/serialization_tests.cc | 26 +++++++++---
 python/pyarrow/plasma.pyx                  |  4 +-
 12 files changed, 165 insertions(+), 104 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index d74c0f4..a683da0 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -54,8 +54,6 @@
 
 namespace plasma {
 
-using arrow::MutableBuffer;
-
 // Number of threads used for memcopy and hash computations.
 constexpr int64_t kThreadPoolSize = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
@@ -130,7 +128,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec
     // Increment the count of the number of objects in the memory-mapped file
     // that are being used. The corresponding decrement should happen in
     // PlasmaClient::Release.
-    auto entry = mmap_table_.find(object->handle.store_fd);
+    auto entry = mmap_table_.find(object->store_fd);
     ARROW_CHECK(entry != mmap_table_.end());
     ARROW_CHECK(entry->second.count >= 0);
     // Update the in_use_object_bytes_.
@@ -149,7 +147,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec
 
 Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
                             uint8_t* metadata, int64_t metadata_size,
-                            std::shared_ptr<Buffer>* data) {
+                            std::shared_ptr<MutableBuffer>* data) {
   ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
                    << data_size << " and metadata size " << metadata_size;
   RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size));
@@ -157,7 +155,10 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
   RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
   ObjectID id;
   PlasmaObject object;
-  RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
+  int store_fd;
+  int64_t mmap_size;
+  RETURN_NOT_OK(
+      ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size));
   // If the CreateReply included an error, then the store will not send a file
   // descriptor.
   int fd = recv_fd(store_conn_);
@@ -167,9 +168,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
   // The metadata should come right after the data.
   ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
   *data = std::make_shared<MutableBuffer>(
-      lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
-          object.data_offset,
-      data_size);
+      lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
   // If plasma_create is being called from a transfer, then we will not copy the
   // metadata here. The metadata will be written along with the data streamed
   // from the transfer.
@@ -209,7 +208,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
       ARROW_CHECK(object_entry->second->is_sealed)
           << "Plasma client called get on an unsealed object that it created";
       PlasmaObject* object = &object_entry->second->object;
-      uint8_t* data = lookup_mmapped_file(object->handle.store_fd);
+      uint8_t* data = lookup_mmapped_file(object->store_fd);
       object_buffers[i].data =
           std::make_shared<Buffer>(data + object->data_offset, object->data_size);
       object_buffers[i].metadata = std::make_shared<Buffer>(
@@ -236,8 +235,19 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
   std::vector<ObjectID> received_object_ids(num_objects);
   std::vector<PlasmaObject> object_data(num_objects);
   PlasmaObject* object;
+  std::vector<int> store_fds;
+  std::vector<int64_t> mmap_sizes;
   RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
-                             object_data.data(), num_objects));
+                             object_data.data(), num_objects, store_fds, mmap_sizes));
+
+  // We mmap all of the file descriptors here so that we can avoid look them up
+  // in the subsequent loop based on just the store file descriptor and without
+  // having to know the relevant file descriptor received from recv_fd.
+  for (size_t i = 0; i < store_fds.size(); i++) {
+    int fd = recv_fd(store_conn_);
+    ARROW_CHECK(fd >= 0);
+    lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
+  }
 
   for (int i = 0; i < num_objects; ++i) {
     DCHECK(received_object_ids[i] == object_ids[i]);
@@ -246,12 +256,6 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
       // If the object was already in use by the client, then the store should
       // have returned it.
       DCHECK_NE(object->data_size, -1);
-      // We won't use this file descriptor, but the store sent us one, so we
-      // need to receive it and then close it right away so we don't leak file
-      // descriptors.
-      int fd = recv_fd(store_conn_);
-      close(fd);
-      ARROW_CHECK(fd >= 0);
       // We've already filled out the information for this object, so we can
       // just continue.
       continue;
@@ -259,12 +263,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
     // If we are here, the object was not currently in use, so we need to
     // process the reply from the object store.
     if (object->data_size != -1) {
-      // The object was retrieved. The user will be responsible for releasing
-      // this object.
-      int fd = recv_fd(store_conn_);
-      uint8_t* data =
-          lookup_or_mmap(fd, object->handle.store_fd, object->handle.mmap_size);
-      ARROW_CHECK(fd >= 0);
+      uint8_t* data = lookup_mmapped_file(object->store_fd);
       // Finish filling out the return values.
       object_buffers[i].data =
           std::make_shared<Buffer>(data + object->data_offset, object->data_size);
@@ -296,7 +295,7 @@ Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
   // Decrement the count of the number of objects in this memory-mapped file
   // that the client is using. The corresponding increment should have
   // happened in plasma_get.
-  int fd = object_entry->second->object.handle.store_fd;
+  int fd = object_entry->second->object.store_fd;
   auto entry = mmap_table_.find(fd);
   ARROW_CHECK(entry != mmap_table_.end());
   ARROW_CHECK(entry->second.count >= 1);
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 35182f8..a1e10a9 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -31,8 +31,9 @@
 #include "arrow/util/visibility.h"
 #include "plasma/common.h"
 
-using arrow::Status;
 using arrow::Buffer;
+using arrow::MutableBuffer;
+using arrow::Status;
 
 namespace plasma {
 
@@ -115,7 +116,7 @@ class ARROW_EXPORT PlasmaClient {
   ///        will be written here.
   /// \return The return status.
   Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
-                int64_t metadata_size, std::shared_ptr<Buffer>* data);
+                int64_t metadata_size, std::shared_ptr<MutableBuffer>* data);
   /// Get some objects from the Plasma Store. This function will block until the
   /// objects have all been created and sealed in the Plasma Store or the
   /// timeout
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index ea6dc8b..33803f7 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -89,8 +89,6 @@ struct PlasmaObjectSpec {
   // Index of the memory segment (= memory mapped file) that
   // this object is allocated in.
   segment_index: int;
-  // Size in bytes of this segment (needed to call mmap).
-  mmap_size: ulong;
   // The offset in bytes in the memory mapped file of the data.
   data_offset: ulong;
   // The size in bytes of the data.
@@ -117,6 +115,12 @@ table PlasmaCreateReply {
   plasma_object: PlasmaObjectSpec;
   // Error that occurred for this call.
   error: PlasmaError;
+  // The file descriptor in the store that corresponds to the file descriptor
+  // being sent to the client right after this message.
+  store_fd: int;
+  // The size in bytes of the segment for the store file descriptor (needed to
+  // call mmap).
+  mmap_size: long;
 }
 
 table PlasmaAbortRequest {
@@ -156,9 +160,17 @@ table PlasmaGetReply {
   // objects if not all requested objects are stored and sealed
   // in the local Plasma store.
   object_ids: [string];
-  // Plasma object information, in the same order as their IDs.
+  // Plasma object information, in the same order as their IDs. The number of
+  // elements in both object_ids and plasma_objects arrays must agree.
   plasma_objects: [PlasmaObjectSpec];
-  // The number of elements in both object_ids and plasma_objects arrays must agree.
+  // A list of the file descriptors in the store that correspond to the file
+  // descriptors being sent to the client. The length of this list is the number
+  // of file descriptors that the store will send to the client after this
+  // message.
+  store_fds: [int];
+  // Size in bytes of the segment for each store file descriptor (needed to call
+  // mmap). This list must have the same length as store_fds.
+  mmap_sizes: [long];
 }
 
 table PlasmaReleaseRequest {
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
index 52d3620..3c5d107 100644
--- a/cpp/src/plasma/malloc.cc
+++ b/cpp/src/plasma/malloc.cc
@@ -197,4 +197,14 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse
   *offset = 0;
 }
 
+int64_t get_mmap_size(int fd) {
+  for (const auto& entry : mmap_records) {
+    if (entry.second.fd == fd) {
+      return entry.second.size;
+    }
+  }
+  ARROW_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd;
+  return -1;  // This code is never reached.
+}
+
 void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); }
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
index 0df720d..cb8c600 100644
--- a/cpp/src/plasma/malloc.h
+++ b/cpp/src/plasma/malloc.h
@@ -23,6 +23,12 @@
 
 void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
 
+/// Get the mmap size corresponding to a specific file descriptor.
+///
+/// @param fd The file descriptor to look up.
+/// @return The size of the corresponding memory-mapped file.
+int64_t get_mmap_size(int fd);
+
 void set_malloc_granularity(int value);
 
 #endif  // MALLOC_H
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 603ff8a..2d07c91 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -64,20 +64,12 @@ struct Client;
 /// Mapping from object IDs to type and status of the request.
 typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
 
-/// Handle to access memory mapped file and map it into client address space.
-struct object_handle {
+// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
+struct PlasmaObject {
   /// The file descriptor of the memory mapped file in the store. It is used as
   /// a unique identifier of the file in the client to look up the corresponding
   /// file descriptor on the client's side.
   int store_fd;
-  /// The size in bytes of the memory mapped file.
-  int64_t mmap_size;
-};
-
-// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
-struct PlasmaObject {
-  /// Handle for memory mapped file the object is stored in.
-  object_handle handle;
   /// The offset in bytes in the memory mapped file of the data.
   ptrdiff_t data_offset;
   /// The offset in bytes in the memory mapped file of the metadata.
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index c0ebb88..6c0bc0c 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -73,30 +73,32 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
   return Status::OK();
 }
 
-Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
-                       int error_code) {
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error_code,
+                       int64_t mmap_size) {
   flatbuffers::FlatBufferBuilder fbb;
-  PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size,
-                                 object->data_offset, object->data_size,
+  PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size,
                                  object->metadata_offset, object->metadata_size);
-  auto message =
-      CreatePlasmaCreateReply(fbb, fbb.CreateString(object_id.binary()), &plasma_object,
-                              static_cast<PlasmaError>(error_code));
+  auto message = CreatePlasmaCreateReply(
+      fbb, fbb.CreateString(object_id.binary()), &plasma_object,
+      static_cast<PlasmaError>(error_code), object->store_fd, mmap_size);
   return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
 }
 
 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
-                       PlasmaObject* object) {
+                       PlasmaObject* object, int* store_fd, int64_t* mmap_size) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
   DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
-  object->handle.store_fd = message->plasma_object()->segment_index();
-  object->handle.mmap_size = message->plasma_object()->mmap_size();
+  object->store_fd = message->plasma_object()->segment_index();
   object->data_offset = message->plasma_object()->data_offset();
   object->data_size = message->plasma_object()->data_size();
   object->metadata_offset = message->plasma_object()->metadata_offset();
   object->metadata_size = message->plasma_object()->metadata_size();
+
+  *store_fd = message->store_fd();
+  *mmap_size = message->mmap_size();
+
   return plasma_error_status(message->error());
 }
 
@@ -389,24 +391,29 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_
 Status SendGetReply(
     int sock, ObjectID object_ids[],
     std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
-    int64_t num_objects) {
+    int64_t num_objects, const std::vector<int>& store_fds,
+    const std::vector<int64_t>& mmap_sizes) {
   flatbuffers::FlatBufferBuilder fbb;
   std::vector<PlasmaObjectSpec> objects;
 
-  for (int i = 0; i < num_objects; ++i) {
+  ARROW_CHECK(store_fds.size() == mmap_sizes.size());
+
+  for (int64_t i = 0; i < num_objects; ++i) {
     const PlasmaObject& object = plasma_objects[object_ids[i]];
-    objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size,
-                                       object.data_offset, object.data_size,
-                                       object.metadata_offset, object.metadata_size));
+    objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset,
+                                       object.data_size, object.metadata_offset,
+                                       object.metadata_size));
   }
   auto message =
       CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
-                           fbb.CreateVectorOfStructs(objects.data(), num_objects));
+                           fbb.CreateVectorOfStructs(objects.data(), num_objects),
+                           fbb.CreateVector(store_fds), fbb.CreateVector(mmap_sizes));
   return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
 }
 
 Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
-                    PlasmaObject plasma_objects[], int64_t num_objects) {
+                    PlasmaObject plasma_objects[], int64_t num_objects,
+                    std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
   DCHECK(verify_flatbuffer(message, data, size));
@@ -415,13 +422,17 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
   }
   for (uoffset_t i = 0; i < num_objects; ++i) {
     const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
-    plasma_objects[i].handle.store_fd = object->segment_index();
-    plasma_objects[i].handle.mmap_size = object->mmap_size();
+    plasma_objects[i].store_fd = object->segment_index();
     plasma_objects[i].data_offset = object->data_offset();
     plasma_objects[i].data_size = object->data_size();
     plasma_objects[i].metadata_offset = object->metadata_offset();
     plasma_objects[i].metadata_size = object->metadata_size();
   }
+  ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
+  for (uoffset_t i = 0; i < message->store_fds()->size(); i++) {
+    store_fds.push_back(message->store_fds()->Get(i));
+    mmap_sizes.push_back(message->mmap_sizes()->Get(i));
+  }
   return Status::OK();
 }
 
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index e8c334f..44263a6 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -46,10 +46,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
 Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
                          int64_t* data_size, int64_t* metadata_size);
 
-Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error);
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error,
+                       int64_t mmap_size);
 
 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
-                       PlasmaObject* object);
+                       PlasmaObject* object, int* store_fd, int64_t* mmap_size);
 
 Status SendAbortRequest(int sock, ObjectID object_id);
 
@@ -81,10 +82,12 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_
 Status SendGetReply(
     int sock, ObjectID object_ids[],
     std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
-    int64_t num_objects);
+    int64_t num_objects, const std::vector<int>& store_fds,
+    const std::vector<int64_t>& mmap_sizes);
 
 Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
-                    PlasmaObject plasma_objects[], int64_t num_objects);
+                    PlasmaObject plasma_objects[], int64_t num_objects,
+                    std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes);
 
 /* Plasma Release message functions. */
 
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index dde7f9c..80dd525 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -192,8 +192,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
   entry->state = PLASMA_CREATED;
 
   store_info_.objects[object_id] = std::move(entry);
-  result->handle.store_fd = fd;
-  result->handle.mmap_size = map_size;
+  result->store_fd = fd;
   result->data_offset = offset;
   result->metadata_offset = offset + data_size;
   result->data_size = data_size;
@@ -211,8 +210,7 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
   DCHECK(object != NULL);
   DCHECK(entry != NULL);
   DCHECK(entry->state == PLASMA_SEALED);
-  object->handle.store_fd = entry->fd;
-  object->handle.mmap_size = entry->map_size;
+  object->store_fd = entry->fd;
   object->data_offset = entry->offset;
   object->metadata_offset = entry->offset + entry->info.data_size;
   object->data_size = entry->info.data_size;
@@ -220,34 +218,44 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
 }
 
 void PlasmaStore::return_from_get(GetRequest* get_req) {
+  // Figure out how many file descriptors we need to send.
+  std::unordered_set<int> fds_to_send;
+  std::vector<int> store_fds;
+  std::vector<int64_t> mmap_sizes;
+  for (const auto& object_id : get_req->object_ids) {
+    PlasmaObject& object = get_req->objects[object_id];
+    int fd = object.store_fd;
+    if (object.data_size != -1 && fds_to_send.count(fd) == 0) {
+      fds_to_send.insert(fd);
+      store_fds.push_back(fd);
+      mmap_sizes.push_back(get_mmap_size(fd));
+    }
+  }
+
   // Send the get reply to the client.
   Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
-                          get_req->object_ids.size());
+                          get_req->object_ids.size(), store_fds, mmap_sizes);
   warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd);
   // If we successfully sent the get reply message to the client, then also send
   // the file descriptors.
   if (s.ok()) {
     // Send all of the file descriptors for the present objects.
-    for (const auto& object_id : get_req->object_ids) {
-      PlasmaObject& object = get_req->objects[object_id];
-      // We use the data size to indicate whether the object is present or not.
-      if (object.data_size != -1) {
-        int error_code = send_fd(get_req->client->fd, object.handle.store_fd);
-        // If we failed to send the file descriptor, loop until we have sent it
-        // successfully. TODO(rkn): This is problematic for two reasons. First
-        // of all, sending the file descriptor should just succeed without any
-        // errors, but sometimes I see a "Message too long" error number.
-        // Second, looping like this allows a client to potentially block the
-        // plasma store event loop which should never happen.
-        while (error_code < 0) {
-          if (errno == EMSGSIZE) {
-            ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
-            error_code = send_fd(get_req->client->fd, object.handle.store_fd);
-            continue;
-          }
-          warn_if_sigpipe(error_code, get_req->client->fd);
-          break;
+    for (int store_fd : store_fds) {
+      int error_code = send_fd(get_req->client->fd, store_fd);
+      // If we failed to send the file descriptor, loop until we have sent it
+      // successfully. TODO(rkn): This is problematic for two reasons. First
+      // of all, sending the file descriptor should just succeed without any
+      // errors, but sometimes I see a "Message too long" error number.
+      // Second, looping like this allows a client to potentially block the
+      // plasma store event loop which should never happen.
+      while (error_code < 0) {
+        if (errno == EMSGSIZE) {
+          ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
+          error_code = send_fd(get_req->client->fd, store_fd);
+          continue;
         }
+        warn_if_sigpipe(error_code, get_req->client->fd);
+        break;
       }
     }
   }
@@ -640,10 +648,15 @@ Status PlasmaStore::process_message(Client* client) {
           ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size));
       int error_code =
           create_object(object_id, data_size, metadata_size, client, &object);
-      HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code),
-                     client->fd);
+      int64_t mmap_size = 0;
+      if (error_code == PlasmaError_OK) {
+        mmap_size = get_mmap_size(object.store_fd);
+      }
+      HANDLE_SIGPIPE(
+          SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
+          client->fd);
       if (error_code == PlasmaError_OK) {
-        warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
+        warn_if_sigpipe(send_fd(client->fd, object.store_fd), client->fd);
       }
     } break;
     case MessageType_PlasmaAbortRequest: {
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index f19c2bf..63b5693 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -70,7 +70,7 @@ TEST_F(TestPlasmaStore, DeleteTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
+  std::shared_ptr<MutableBuffer> data;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
   ARROW_CHECK_OK(client_.Seal(object_id));
 
@@ -96,7 +96,7 @@ TEST_F(TestPlasmaStore, ContainsTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
+  std::shared_ptr<MutableBuffer> data;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
   ARROW_CHECK_OK(client_.Seal(object_id));
   // Avoid race condition of Plasma Manager waiting for notification.
@@ -119,7 +119,7 @@ TEST_F(TestPlasmaStore, GetTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data_buffer;
+  std::shared_ptr<MutableBuffer> data_buffer;
   uint8_t* data;
   ARROW_CHECK_OK(
       client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer));
@@ -145,7 +145,7 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
+  std::shared_ptr<MutableBuffer> data;
   ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
   data->mutable_data()[0] = 1;
   ARROW_CHECK_OK(client_.Seal(object_id1));
@@ -172,7 +172,7 @@ TEST_F(TestPlasmaStore, AbortTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
+  std::shared_ptr<MutableBuffer> data;
   uint8_t* data_ptr;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
   data_ptr = data->mutable_data();
@@ -220,7 +220,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
+  std::shared_ptr<MutableBuffer> data;
   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
@@ -260,7 +260,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
     int64_t data_size = 100;
     uint8_t metadata[] = {5};
     int64_t metadata_size = sizeof(metadata);
-    std::shared_ptr<Buffer> data;
+    std::shared_ptr<MutableBuffer> data;
     ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
 
     if (i % 3 == 0) {
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index b593b6a..656b2cc 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -63,8 +63,7 @@ PlasmaObject random_plasma_object(void) {
   int random = rand_r(&seed);
   PlasmaObject object;
   memset(&object, 0, sizeof(object));
-  object.handle.store_fd = random + 7;
-  object.handle.mmap_size = random + 42;
+  object.store_fd = random + 7;
   object.data_offset = random + 1;
   object.metadata_offset = random + 2;
   object.data_size = random + 3;
@@ -94,13 +93,19 @@ TEST(PlasmaSerialization, CreateReply) {
   int fd = create_temp_file();
   ObjectID object_id1 = ObjectID::from_random();
   PlasmaObject object1 = random_plasma_object();
-  ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0));
+  int64_t mmap_size1 = 1000000;
+  ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0, mmap_size1));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaCreateReply);
   ObjectID object_id2;
   PlasmaObject object2;
   memset(&object2, 0, sizeof(object2));
-  ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2));
+  int store_fd;
+  int64_t mmap_size2;
+  ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2,
+                                 &store_fd, &mmap_size2));
   ASSERT_EQ(object_id1, object_id2);
+  ASSERT_EQ(object1.store_fd, store_fd);
+  ASSERT_EQ(mmap_size1, mmap_size2);
   ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0);
   close(fd);
 }
@@ -158,13 +163,20 @@ TEST(PlasmaSerialization, GetReply) {
   std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> plasma_objects;
   plasma_objects[object_ids[0]] = random_plasma_object();
   plasma_objects[object_ids[1]] = random_plasma_object();
-  ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2));
+  std::vector<int> store_fds = {1, 2, 3};
+  std::vector<int64_t> mmap_sizes = {100, 200, 300};
+  ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2, store_fds, mmap_sizes));
+
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetReply);
   ObjectID object_ids_return[2];
   PlasmaObject plasma_objects_return[2];
+  std::vector<int> store_fds_return;
+  std::vector<int64_t> mmap_sizes_return;
   memset(&plasma_objects_return, 0, sizeof(plasma_objects_return));
   ARROW_CHECK_OK(ReadGetReply(data.data(), data.size(), object_ids_return,
-                              &plasma_objects_return[0], 2));
+                              &plasma_objects_return[0], 2, store_fds_return,
+                              mmap_sizes_return));
+
   ASSERT_EQ(object_ids[0], object_ids_return[0]);
   ASSERT_EQ(object_ids[1], object_ids_return[1]);
   ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0],
@@ -173,6 +185,8 @@ TEST(PlasmaSerialization, GetReply) {
   ASSERT_EQ(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1],
                    sizeof(PlasmaObject)),
             0);
+  ASSERT_TRUE(store_fds == store_fds_return);
+  ASSERT_TRUE(mmap_sizes == mmap_sizes_return);
   close(fd);
 }
 
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 32f6d18..801d094 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -81,7 +81,7 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Create(const CUniqueID& object_id, int64_t data_size,
                        const uint8_t* metadata, int64_t metadata_size,
-                       const shared_ptr[CBuffer]* data)
+                       const shared_ptr[CMutableBuffer]* data)
 
         CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
                     int64_t timeout_ms, CObjectBuffer* object_buffers)
@@ -297,7 +297,7 @@ cdef class PlasmaClient:
                 not be created because the plasma store is unable to evict
                 enough objects to create room for it.
         """
-        cdef shared_ptr[CBuffer] data
+        cdef shared_ptr[CMutableBuffer] data
         with nogil:
             check_status(self.client.get().Create(object_id.data, data_size,
                                                   <uint8_t*>(metadata.data()),

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].