You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2018/08/02 03:45:52 UTC
[arrow] branch master updated: ARROW-2953: [Plasma] Reduce plasma
memory usage
This is an automated email from the ASF dual-hosted git repository.
robertnishihara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d48dce2 ARROW-2953: [Plasma] Reduce plasma memory usage
d48dce2 is described below
commit d48dce2cfebdbd044a8260d0a77f5fe3d89a4a2d
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Wed Aug 1 20:45:44 2018 -0700
ARROW-2953: [Plasma] Reduce plasma memory usage
Together with the previous PR, this already reduces the memory consumption by 40%.
Some more reduction could be done by factoring out some common fields from the object table (int fd, int device_num, int64_t map_size, we only need one of offset and pointer).
Author: Philipp Moritz <pc...@gmail.com>
Closes #2359 from pcmoritz/plasma-memory-usage and squashes the following commits:
40f998e <Philipp Moritz> linting
00708e4 <Philipp Moritz> linting
7d0f74a <Philipp Moritz> cleanups
fbbf73b <Philipp Moritz> fix
fdad9c6 <Philipp Moritz> fix
bf2c969 <Philipp Moritz> more memory optimizations
---
cpp/src/plasma/eviction_policy.cc | 28 ++++++++++++++--------------
cpp/src/plasma/plasma.h | 6 ++++--
cpp/src/plasma/store.cc | 28 +++++++++++++++++++---------
3 files changed, 37 insertions(+), 25 deletions(-)
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index ebe4e1a..cace588 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -24,7 +24,7 @@ namespace plasma {
void LRUCache::Add(const ObjectID& key, int64_t size) {
auto it = item_map_.find(key);
ARROW_CHECK(it == item_map_.end());
- /* Note that it is important to use a list so the iterators stay valid. */
+ // Note that it is important to use a list so the iterators stay valid.
item_list_.emplace_front(key, size);
item_map_.emplace(key, item_list_.begin());
}
@@ -55,11 +55,11 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
std::vector<ObjectID>* objects_to_evict) {
int64_t bytes_evicted =
cache_.ChooseObjectsToEvict(num_bytes_required, objects_to_evict);
- /* Update the LRU cache. */
+ // Update the LRU cache.
for (auto& object_id : *objects_to_evict) {
cache_.Remove(object_id);
}
- /* Update the number of bytes used. */
+ // Update the number of bytes used.
memory_used_ -= bytes_evicted;
ARROW_CHECK(memory_used_ >= 0);
return bytes_evicted;
@@ -67,20 +67,20 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
void EvictionPolicy::ObjectCreated(const ObjectID& object_id) {
auto entry = store_info_->objects[object_id].get();
- cache_.Add(object_id, entry->info.data_size + entry->info.metadata_size);
- int64_t size = entry->info.data_size + entry->info.metadata_size;
+ cache_.Add(object_id, entry->data_size + entry->metadata_size);
+ int64_t size = entry->data_size + entry->metadata_size;
memory_used_ += size;
ARROW_CHECK(memory_used_ <= store_info_->memory_capacity);
}
bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict) {
- /* Check if there is enough space to create the object. */
+ // Check if there is enough space to create the object.
int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
- /* Try to free up at least as much space as we need right now but ideally
- * up to 20% of the total capacity. */
+ // Try to free up at least as much space as we need right now but ideally
+ // up to 20% of the total capacity.
int64_t space_to_free = std::max(required_space, store_info_->memory_capacity / 5);
ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
- /* Choose some objects to evict, and update the return pointers. */
+ // Choose some objects to evict, and update the return pointers.
int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict);
ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
<< objects_to_evict->size() << " objects to free up "
@@ -90,23 +90,23 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_t
void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id,
std::vector<ObjectID>* objects_to_evict) {
- /* If the object is in the LRU cache, remove it. */
+ // If the object is in the LRU cache, remove it.
cache_.Remove(object_id);
}
void EvictionPolicy::EndObjectAccess(const ObjectID& object_id,
std::vector<ObjectID>* objects_to_evict) {
auto entry = store_info_->objects[object_id].get();
- /* Add the object to the LRU cache.*/
- cache_.Add(object_id, entry->info.data_size + entry->info.metadata_size);
+ // Add the object to the LRU cache.
+ cache_.Add(object_id, entry->data_size + entry->metadata_size);
}
void EvictionPolicy::RemoveObject(const ObjectID& object_id) {
- /* If the object is in the LRU cache, remove it. */
+ // If the object is in the LRU cache, remove it.
cache_.Remove(object_id);
auto entry = store_info_->objects[object_id].get();
- int64_t size = entry->info.data_size + entry->info.metadata_size;
+ int64_t size = entry->data_size + entry->metadata_size;
ARROW_CHECK(memory_used_ >= size);
memory_used_ -= size;
}
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 11c2b2b..57ba882 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -116,8 +116,6 @@ struct ObjectTableEntry {
~ObjectTableEntry();
- /// Object info like size, creation time and owner.
- flatbuf::ObjectInfoT info;
/// Memory mapped file containing the object.
int fd;
/// Device number.
@@ -128,6 +126,10 @@ struct ObjectTableEntry {
ptrdiff_t offset;
/// Pointer to the object data. Needed to free the object.
uint8_t* pointer;
+ /// Size of the object in bytes.
+ int64_t data_size;
+ /// Size of the object metadata in bytes.
+ int64_t metadata_size;
#ifdef PLASMA_GPU
/// IPC GPU handle to share with clients.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index d68f6e3..8cef3e3 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -205,9 +205,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
assert(fd != -1);
}
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
- entry->info.object_id = object_id.binary();
- entry->info.data_size = data_size;
- entry->info.metadata_size = metadata_size;
+ entry->data_size = data_size;
+ entry->metadata_size = metadata_size;
entry->pointer = pointer;
// TODO(pcm): Set the other fields.
entry->fd = fd;
@@ -248,9 +247,9 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
#endif
object->store_fd = entry->fd;
object->data_offset = entry->offset;
- object->metadata_offset = entry->offset + entry->info.data_size;
- object->data_size = entry->info.data_size;
- object->metadata_size = entry->info.metadata_size;
+ object->metadata_offset = entry->offset + entry->data_size;
+ object->data_size = entry->data_size;
+ object->metadata_size = entry->metadata_size;
object->device_num = entry->device_num;
}
@@ -445,9 +444,14 @@ void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[])
// Set the state of object to SEALED.
entry->state = ObjectState::PLASMA_SEALED;
// Set the object digest.
- entry->info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
+ std::memcpy(&entry->digest[0], &digest[0], kDigestSize);
// Inform all subscribers that a new object has been sealed.
- PushNotification(&entry->info);
+ ObjectInfoT info;
+ info.object_id = object_id.binary();
+ info.data_size = entry->data_size;
+ info.metadata_size = entry->metadata_size;
+ info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
+ PushNotification(&info);
// Update all get requests that involve this object.
UpdateObjectGetRequests(object_id);
@@ -701,7 +705,13 @@ void PlasmaStore::SubscribeToUpdates(Client* client) {
// Push notifications to the new subscriber about existing sealed objects.
for (const auto& entry : store_info_.objects) {
if (entry.second->state == ObjectState::PLASMA_SEALED) {
- PushNotification(&entry.second->info, fd);
+ ObjectInfoT info;
+ info.object_id = entry.first.binary();
+ info.data_size = entry.second->data_size;
+ info.metadata_size = entry.second->metadata_size;
+ info.digest =
+ std::string(reinterpret_cast<char*>(&entry.second->digest[0]), kDigestSize);
+ PushNotification(&info, fd);
}
}
}