You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2018/08/02 03:45:52 UTC

[arrow] branch master updated: ARROW-2953: [Plasma] Reduce plasma memory usage

This is an automated email from the ASF dual-hosted git repository.

robertnishihara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d48dce2  ARROW-2953: [Plasma] Reduce plasma memory usage
d48dce2 is described below

commit d48dce2cfebdbd044a8260d0a77f5fe3d89a4a2d
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Wed Aug 1 20:45:44 2018 -0700

    ARROW-2953: [Plasma] Reduce plasma memory usage
    
    Together with the previous PR, this already reduces the memory consumption by 40%.
    
    Some more reduction could be done by factoring out some common fields from the object table (int fd, int device_num, int64_t map_size, we only need one of offset and pointer).
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #2359 from pcmoritz/plasma-memory-usage and squashes the following commits:
    
    40f998e <Philipp Moritz> linting
    00708e4 <Philipp Moritz> linting
    7d0f74a <Philipp Moritz> cleanups
    fbbf73b <Philipp Moritz> fix
    fdad9c6 <Philipp Moritz> fix
    bf2c969 <Philipp Moritz> more memory optimizations
---
 cpp/src/plasma/eviction_policy.cc | 28 ++++++++++++++--------------
 cpp/src/plasma/plasma.h           |  6 ++++--
 cpp/src/plasma/store.cc           | 28 +++++++++++++++++++---------
 3 files changed, 37 insertions(+), 25 deletions(-)

diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index ebe4e1a..cace588 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -24,7 +24,7 @@ namespace plasma {
 void LRUCache::Add(const ObjectID& key, int64_t size) {
   auto it = item_map_.find(key);
   ARROW_CHECK(it == item_map_.end());
-  /* Note that it is important to use a list so the iterators stay valid. */
+  // Note that it is important to use a list so the iterators stay valid.
   item_list_.emplace_front(key, size);
   item_map_.emplace(key, item_list_.begin());
 }
@@ -55,11 +55,11 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
                                              std::vector<ObjectID>* objects_to_evict) {
   int64_t bytes_evicted =
       cache_.ChooseObjectsToEvict(num_bytes_required, objects_to_evict);
-  /* Update the LRU cache. */
+  // Update the LRU cache.
   for (auto& object_id : *objects_to_evict) {
     cache_.Remove(object_id);
   }
-  /* Update the number of bytes used. */
+  // Update the number of bytes used.
   memory_used_ -= bytes_evicted;
   ARROW_CHECK(memory_used_ >= 0);
   return bytes_evicted;
@@ -67,20 +67,20 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
 
 void EvictionPolicy::ObjectCreated(const ObjectID& object_id) {
   auto entry = store_info_->objects[object_id].get();
-  cache_.Add(object_id, entry->info.data_size + entry->info.metadata_size);
-  int64_t size = entry->info.data_size + entry->info.metadata_size;
+  cache_.Add(object_id, entry->data_size + entry->metadata_size);
+  int64_t size = entry->data_size + entry->metadata_size;
   memory_used_ += size;
   ARROW_CHECK(memory_used_ <= store_info_->memory_capacity);
 }
 
 bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict) {
-  /* Check if there is enough space to create the object. */
+  // Check if there is enough space to create the object.
   int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
-  /* Try to free up at least as much space as we need right now but ideally
-   * up to 20% of the total capacity. */
+  // Try to free up at least as much space as we need right now but ideally
+  // up to 20% of the total capacity.
   int64_t space_to_free = std::max(required_space, store_info_->memory_capacity / 5);
   ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
-  /* Choose some objects to evict, and update the return pointers. */
+  // Choose some objects to evict, and update the return pointers.
   int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict);
   ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
                   << objects_to_evict->size() << " objects to free up "
@@ -90,23 +90,23 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_t
 
 void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id,
                                        std::vector<ObjectID>* objects_to_evict) {
-  /* If the object is in the LRU cache, remove it. */
+  // If the object is in the LRU cache, remove it.
   cache_.Remove(object_id);
 }
 
 void EvictionPolicy::EndObjectAccess(const ObjectID& object_id,
                                      std::vector<ObjectID>* objects_to_evict) {
   auto entry = store_info_->objects[object_id].get();
-  /* Add the object to the LRU cache.*/
-  cache_.Add(object_id, entry->info.data_size + entry->info.metadata_size);
+  // Add the object to the LRU cache.
+  cache_.Add(object_id, entry->data_size + entry->metadata_size);
 }
 
 void EvictionPolicy::RemoveObject(const ObjectID& object_id) {
-  /* If the object is in the LRU cache, remove it. */
+  // If the object is in the LRU cache, remove it.
   cache_.Remove(object_id);
 
   auto entry = store_info_->objects[object_id].get();
-  int64_t size = entry->info.data_size + entry->info.metadata_size;
+  int64_t size = entry->data_size + entry->metadata_size;
   ARROW_CHECK(memory_used_ >= size);
   memory_used_ -= size;
 }
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 11c2b2b..57ba882 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -116,8 +116,6 @@ struct ObjectTableEntry {
 
   ~ObjectTableEntry();
 
-  /// Object info like size, creation time and owner.
-  flatbuf::ObjectInfoT info;
   /// Memory mapped file containing the object.
   int fd;
   /// Device number.
@@ -128,6 +126,10 @@ struct ObjectTableEntry {
   ptrdiff_t offset;
   /// Pointer to the object data. Needed to free the object.
   uint8_t* pointer;
+  /// Size of the object in bytes.
+  int64_t data_size;
+  /// Size of the object metadata in bytes.
+  int64_t metadata_size;
 #ifdef PLASMA_GPU
   /// IPC GPU handle to share with clients.
   std::shared_ptr<CudaIpcMemHandle> ipc_handle;
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index d68f6e3..8cef3e3 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -205,9 +205,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
     assert(fd != -1);
   }
   auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
-  entry->info.object_id = object_id.binary();
-  entry->info.data_size = data_size;
-  entry->info.metadata_size = metadata_size;
+  entry->data_size = data_size;
+  entry->metadata_size = metadata_size;
   entry->pointer = pointer;
   // TODO(pcm): Set the other fields.
   entry->fd = fd;
@@ -248,9 +247,9 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
 #endif
   object->store_fd = entry->fd;
   object->data_offset = entry->offset;
-  object->metadata_offset = entry->offset + entry->info.data_size;
-  object->data_size = entry->info.data_size;
-  object->metadata_size = entry->info.metadata_size;
+  object->metadata_offset = entry->offset + entry->data_size;
+  object->data_size = entry->data_size;
+  object->metadata_size = entry->metadata_size;
   object->device_num = entry->device_num;
 }
 
@@ -445,9 +444,14 @@ void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[])
   // Set the state of object to SEALED.
   entry->state = ObjectState::PLASMA_SEALED;
   // Set the object digest.
-  entry->info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
+  std::memcpy(&entry->digest[0], &digest[0], kDigestSize);
   // Inform all subscribers that a new object has been sealed.
-  PushNotification(&entry->info);
+  ObjectInfoT info;
+  info.object_id = object_id.binary();
+  info.data_size = entry->data_size;
+  info.metadata_size = entry->metadata_size;
+  info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
+  PushNotification(&info);
 
   // Update all get requests that involve this object.
   UpdateObjectGetRequests(object_id);
@@ -701,7 +705,13 @@ void PlasmaStore::SubscribeToUpdates(Client* client) {
   // Push notifications to the new subscriber about existing sealed objects.
   for (const auto& entry : store_info_.objects) {
     if (entry.second->state == ObjectState::PLASMA_SEALED) {
-      PushNotification(&entry.second->info, fd);
+      ObjectInfoT info;
+      info.object_id = entry.first.binary();
+      info.data_size = entry.second->data_size;
+      info.metadata_size = entry.second->metadata_size;
+      info.digest =
+          std::string(reinterpret_cast<char*>(&entry.second->digest[0]), kDigestSize);
+      PushNotification(&info, fd);
     }
   }
 }