You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2019/03/22 05:41:30 UTC

[arrow] branch master updated: ARROW-4983: [Plasma] Unmap memory upon destruction of the PlasmaClient

This is an automated email from the ASF dual-hosted git repository.

robertnishihara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a4e9195  ARROW-4983: [Plasma] Unmap memory upon destruction of the PlasmaClient
a4e9195 is described below

commit a4e9195808c7395bc924f30cba018b3e0fec07fa
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Mar 21 22:41:20 2019 -0700

    ARROW-4983: [Plasma] Unmap memory upon destruction of the PlasmaClient
    
    Author: Philipp Moritz <pc...@gmail.com>
    Author: Robert Nishihara <ro...@gmail.com>
    
    Closes #4001 from pcmoritz/plasma-client-unmap and squashes the following commits:
    
    893bcdc1 <Robert Nishihara> Update client.cc
    784f9afd <Philipp Moritz> add documentation
    7988a85d <Philipp Moritz> add PlasmaMutableBuffer class
    0a6203b5 <Philipp Moritz> fixes
    af51c7ac <Philipp Moritz> add documentation
    53e4b1de <Philipp Moritz> unmap files in plasma client destructor
---
 cpp/src/plasma/client.cc | 88 ++++++++++++++++++++++++++++++++++--------------
 cpp/src/plasma/client.h  |  1 +
 2 files changed, 63 insertions(+), 26 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 80cf718..e88c5ca 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -105,7 +105,7 @@ static std::mutex gpu_mutex;
 // PlasmaBuffer
 
 /// A Buffer class that automatically releases the backing plasma object
-/// when it goes out of scope.
+/// when it goes out of scope. This is returned by Get.
 class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
  public:
   ~PlasmaBuffer();
@@ -123,6 +123,19 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
   ObjectID object_id_;
 };
 
+/// A mutable Buffer class that keeps the backing data alive by keeping a
+/// PlasmaClient shared pointer. This is returned by Create. Release will
+/// be called in the associated Seal call.
+class ARROW_NO_EXPORT PlasmaMutableBuffer : public MutableBuffer {
+ public:
+  PlasmaMutableBuffer(std::shared_ptr<PlasmaClient::Impl> client, uint8_t* mutable_data,
+                      int64_t data_size)
+      : MutableBuffer(mutable_data, data_size), client_(client) {}
+
+ private:
+  std::shared_ptr<PlasmaClient::Impl> client_;
+};
+
 // ----------------------------------------------------------------------
 // PlasmaClient::Impl
 
@@ -140,13 +153,47 @@ struct ObjectInUseEntry {
   bool is_sealed;
 };
 
-struct ClientMmapTableEntry {
+class ClientMmapTableEntry {
+ public:
+  ClientMmapTableEntry(int fd, int64_t map_size)
+      : fd_(fd), pointer_(nullptr), length_(0) {
+    // We subtract kMmapRegionsGap from the length that was added
+    // in fake_mmap in malloc.h, to make map_size page-aligned again.
+    length_ = map_size - kMmapRegionsGap;
+    pointer_ = reinterpret_cast<uint8_t*>(
+        mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
+    // TODO(pcm): Don't fail here, instead return a Status.
+    if (pointer_ == MAP_FAILED) {
+      ARROW_LOG(FATAL) << "mmap failed";
+    }
+    close(fd);  // Closing this fd has an effect on performance.
+  }
+
+  ~ClientMmapTableEntry() {
+    // At this point it is safe to unmap the memory, as the PlasmaBuffer
+    // keeps the PlasmaClient (and therefore the ClientMmapTableEntry)
+    // alive until it is destroyed.
+    // We don't need to close the associated file, since it has
+    // already been closed in the constructor.
+    int r = munmap(pointer_, length_);
+    if (r != 0) {
+      ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno;
+    }
+  }
+
+  uint8_t* pointer() { return pointer_; }
+
+  int fd() { return fd_; }
+
+ private:
   /// The associated file descriptor on the client.
-  int fd;
+  int fd_;
   /// The result of mmap for this file descriptor.
-  uint8_t* pointer;
+  uint8_t* pointer_;
   /// The length of the memory-mapped file.
-  size_t length;
+  size_t length_;
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry);
 };
 
 class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
@@ -244,7 +291,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
   /// Table of dlmalloc buffer files that have been memory mapped so far. This
   /// is a hash table mapping a file descriptor to a struct containing the
   /// address of the corresponding memory-mapped file.
-  std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
+  std::unordered_map<int, std::unique_ptr<ClientMmapTableEntry>> mmap_table_;
   /// A hash table of the object IDs that are currently being used by this
   /// client.
   std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
@@ -277,23 +324,11 @@ PlasmaClient::Impl::~Impl() {}
 uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
   auto entry = mmap_table_.find(store_fd_val);
   if (entry != mmap_table_.end()) {
-    return entry->second.pointer;
+    return entry->second->pointer();
   } else {
-    // We subtract kMmapRegionsGap from the length that was added
-    // in fake_mmap in malloc.h, to make map_size page-aligned again.
-    uint8_t* result = reinterpret_cast<uint8_t*>(mmap(
-        NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
-    // TODO(pcm): Don't fail here, instead return a Status.
-    if (result == MAP_FAILED) {
-      ARROW_LOG(FATAL) << "mmap failed";
-    }
-    close(fd);  // Closing this fd has an effect on performance.
-
-    ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
-    entry.fd = fd;
-    entry.pointer = result;
-    entry.length = map_size;
-    return result;
+    mmap_table_[store_fd_val] =
+        std::unique_ptr<ClientMmapTableEntry>(new ClientMmapTableEntry(fd, map_size));
+    return mmap_table_[store_fd_val]->pointer();
   }
 }
 
@@ -302,7 +337,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
 uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
   auto entry = mmap_table_.find(store_fd_val);
   ARROW_CHECK(entry != mmap_table_.end());
-  return entry->second.pointer;
+  return entry->second->pointer();
 }
 
 bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
@@ -317,7 +352,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) {
     ARROW_CHECK(fd >= 0) << "recv not successful";
     return fd;
   } else {
-    return entry->second.fd;
+    return entry->second->fd();
   }
 }
 
@@ -369,8 +404,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
     ARROW_CHECK(object.metadata_size == metadata_size);
     // The metadata should come right after the data.
     ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
-    *data = std::make_shared<MutableBuffer>(
-        LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
+    *data = std::make_shared<PlasmaMutableBuffer>(
+        shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset,
+        data_size);
     // If plasma_create is being called from a transfer, then we will not copy the
     // metadata here. The metadata will be written along with the data streamed
     // from the transfer.
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 8d1d6a7..facfd37 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -257,6 +257,7 @@ class ARROW_EXPORT PlasmaClient {
 
  private:
   friend class PlasmaBuffer;
+  friend class PlasmaMutableBuffer;
   FRIEND_TEST(TestPlasmaStore, GetTest);
   FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
   FRIEND_TEST(TestPlasmaStore, AbortTest);