You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2019/03/22 05:41:30 UTC
[arrow] branch master updated: ARROW-4983: [Plasma] Unmap memory
upon destruction of the PlasmaClient
This is an automated email from the ASF dual-hosted git repository.
robertnishihara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a4e9195 ARROW-4983: [Plasma] Unmap memory upon destruction of the PlasmaClient
a4e9195 is described below
commit a4e9195808c7395bc924f30cba018b3e0fec07fa
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Mar 21 22:41:20 2019 -0700
ARROW-4983: [Plasma] Unmap memory upon destruction of the PlasmaClient
Author: Philipp Moritz <pc...@gmail.com>
Author: Robert Nishihara <ro...@gmail.com>
Closes #4001 from pcmoritz/plasma-client-unmap and squashes the following commits:
893bcdc1 <Robert Nishihara> Update client.cc
784f9afd <Philipp Moritz> add documentation
7988a85d <Philipp Moritz> add PlasmaMutableBuffer class
0a6203b5 <Philipp Moritz> fixes
af51c7ac <Philipp Moritz> add documentation
53e4b1de <Philipp Moritz> unmap files in plasma client destructor
---
cpp/src/plasma/client.cc | 88 ++++++++++++++++++++++++++++++++++--------------
cpp/src/plasma/client.h | 1 +
2 files changed, 63 insertions(+), 26 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 80cf718..e88c5ca 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -105,7 +105,7 @@ static std::mutex gpu_mutex;
// PlasmaBuffer
/// A Buffer class that automatically releases the backing plasma object
-/// when it goes out of scope.
+/// when it goes out of scope. This is returned by Get.
class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
public:
~PlasmaBuffer();
@@ -123,6 +123,19 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
ObjectID object_id_;
};
+/// A mutable Buffer class that keeps the backing data alive by keeping a
+/// PlasmaClient shared pointer. This is returned by Create. Release will
+/// be called in the associated Seal call.
+class ARROW_NO_EXPORT PlasmaMutableBuffer : public MutableBuffer {
+ public:
+ PlasmaMutableBuffer(std::shared_ptr<PlasmaClient::Impl> client, uint8_t* mutable_data,
+ int64_t data_size)
+ : MutableBuffer(mutable_data, data_size), client_(client) {}
+
+ private:
+ std::shared_ptr<PlasmaClient::Impl> client_;
+};
+
// ----------------------------------------------------------------------
// PlasmaClient::Impl
@@ -140,13 +153,47 @@ struct ObjectInUseEntry {
bool is_sealed;
};
-struct ClientMmapTableEntry {
+class ClientMmapTableEntry {
+ public:
+ ClientMmapTableEntry(int fd, int64_t map_size)
+ : fd_(fd), pointer_(nullptr), length_(0) {
+ // We subtract kMmapRegionsGap from the length that was added
+ // in fake_mmap in malloc.h, to make map_size page-aligned again.
+ length_ = map_size - kMmapRegionsGap;
+ pointer_ = reinterpret_cast<uint8_t*>(
+ mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
+ // TODO(pcm): Don't fail here, instead return a Status.
+ if (pointer_ == MAP_FAILED) {
+ ARROW_LOG(FATAL) << "mmap failed";
+ }
+ close(fd); // Closing this fd has an effect on performance.
+ }
+
+ ~ClientMmapTableEntry() {
+ // At this point it is safe to unmap the memory, as the PlasmaBuffer
+ // keeps the PlasmaClient (and therefore the ClientMmapTableEntry)
+ // alive until it is destroyed.
+ // We don't need to close the associated file, since it has
+ // already been closed in the constructor.
+ int r = munmap(pointer_, length_);
+ if (r != 0) {
+ ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno;
+ }
+ }
+
+ uint8_t* pointer() { return pointer_; }
+
+ int fd() { return fd_; }
+
+ private:
/// The associated file descriptor on the client.
- int fd;
+ int fd_;
/// The result of mmap for this file descriptor.
- uint8_t* pointer;
+ uint8_t* pointer_;
/// The length of the memory-mapped file.
- size_t length;
+ size_t length_;
+
+ ARROW_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry);
};
class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
@@ -244,7 +291,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
/// Table of dlmalloc buffer files that have been memory mapped so far. This
/// is a hash table mapping a file descriptor to a struct containing the
/// address of the corresponding memory-mapped file.
- std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
+ std::unordered_map<int, std::unique_ptr<ClientMmapTableEntry>> mmap_table_;
/// A hash table of the object IDs that are currently being used by this
/// client.
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
@@ -277,23 +324,11 @@ PlasmaClient::Impl::~Impl() {}
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
- return entry->second.pointer;
+ return entry->second->pointer();
} else {
- // We subtract kMmapRegionsGap from the length that was added
- // in fake_mmap in malloc.h, to make map_size page-aligned again.
- uint8_t* result = reinterpret_cast<uint8_t*>(mmap(
- NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
- // TODO(pcm): Don't fail here, instead return a Status.
- if (result == MAP_FAILED) {
- ARROW_LOG(FATAL) << "mmap failed";
- }
- close(fd); // Closing this fd has an effect on performance.
-
- ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
- entry.fd = fd;
- entry.pointer = result;
- entry.length = map_size;
- return result;
+ mmap_table_[store_fd_val] =
+ std::unique_ptr<ClientMmapTableEntry>(new ClientMmapTableEntry(fd, map_size));
+ return mmap_table_[store_fd_val]->pointer();
}
}
@@ -302,7 +337,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
auto entry = mmap_table_.find(store_fd_val);
ARROW_CHECK(entry != mmap_table_.end());
- return entry->second.pointer;
+ return entry->second->pointer();
}
bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
@@ -317,7 +352,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) {
ARROW_CHECK(fd >= 0) << "recv not successful";
return fd;
} else {
- return entry->second.fd;
+ return entry->second->fd();
}
}
@@ -369,8 +404,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
ARROW_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
- *data = std::make_shared<MutableBuffer>(
- LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
+ *data = std::make_shared<PlasmaMutableBuffer>(
+ shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset,
+ data_size);
// If plasma_create is being called from a transfer, then we will not copy the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 8d1d6a7..facfd37 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -257,6 +257,7 @@ class ARROW_EXPORT PlasmaClient {
private:
friend class PlasmaBuffer;
+ friend class PlasmaMutableBuffer;
FRIEND_TEST(TestPlasmaStore, GetTest);
FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
FRIEND_TEST(TestPlasmaStore, AbortTest);