You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/01 13:56:04 UTC

[arrow] branch master updated: ARROW-2954: [Plasma] Reduce plasma store memory usage

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b24bc2  ARROW-2954: [Plasma] Reduce plasma store memory usage
3b24bc2 is described below

commit 3b24bc272af7ee46a7d9f125169bab20873f3dc8
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Wed Aug 1 09:55:58 2018 -0400

    ARROW-2954: [Plasma] Reduce plasma store memory usage
    
    Do not store the object id in the object table (it is already stored as the key in the hash map).
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #2353 from pcmoritz/reduce-memory-usage and squashes the following commits:
    
    7853f4af <Philipp Moritz> fix
    33d1992b <Philipp Moritz> fix linting
    776b52bf <Philipp Moritz> reduce plasma store memory usage
---
 cpp/src/plasma/plasma.h |  2 --
 cpp/src/plasma/store.cc | 37 +++++++++++++++++++------------------
 cpp/src/plasma/store.h  |  6 ++++--
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 6e15807..11c2b2b 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -116,8 +116,6 @@ struct ObjectTableEntry {
 
   ~ObjectTableEntry();
 
-  /// Object id of this object.
-  ObjectID object_id;
   /// Object info like size, creation time and owner.
   flatbuf::ObjectInfoT info;
   /// Memory mapped file containing the object.
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index f55f3c9..d68f6e3 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -125,9 +125,10 @@ const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_;
 
 // If this client is not already using the object, add the client to the
 // object's list of clients, otherwise do nothing.
-void PlasmaStore::AddToClientObjectIds(ObjectTableEntry* entry, Client* client) {
+void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+                                       Client* client) {
   // Check if this client is already using the object.
-  if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
+  if (client->object_ids.find(object_id) != client->object_ids.end()) {
     return;
   }
   // If there are no other clients using this object, notify the eviction policy
@@ -135,14 +136,14 @@ void PlasmaStore::AddToClientObjectIds(ObjectTableEntry* entry, Client* client)
   if (entry->ref_count == 0) {
     // Tell the eviction policy that this object is being used.
     std::vector<ObjectID> objects_to_evict;
-    eviction_policy_.BeginObjectAccess(entry->object_id, &objects_to_evict);
+    eviction_policy_.BeginObjectAccess(object_id, &objects_to_evict);
     DeleteObjects(objects_to_evict);
   }
   // Increase reference count.
   entry->ref_count++;
 
   // Add object id to the list of object ids that this client is using.
-  client->object_ids.insert(entry->object_id);
+  client->object_ids.insert(object_id);
 }
 
 // Create a new object buffer in the hash table.
@@ -204,7 +205,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
     assert(fd != -1);
   }
   auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
-  entry->object_id = object_id;
   entry->info.object_id = object_id.binary();
   entry->info.data_size = data_size;
   entry->info.metadata_size = metadata_size;
@@ -233,7 +233,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
   // eviction policy does not have an opportunity to evict the object.
   eviction_policy_.ObjectCreated(object_id);
   // Record that this client is using this object.
-  AddToClientObjectIds(store_info_.objects[object_id].get(), client);
+  AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
   return PlasmaError::OK;
 }
 
@@ -331,7 +331,7 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
     get_req->num_satisfied += 1;
     // Record the fact that this client will be using this object and will
     // be responsible for releasing this object.
-    AddToClientObjectIds(entry, get_req->client);
+    AddToClientObjectIds(object_id, entry, get_req->client);
 
     // If this get request is done, reply to the client.
     if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
@@ -365,7 +365,7 @@ void PlasmaStore::ProcessGetRequest(Client* client,
       get_req->num_satisfied += 1;
       // If necessary, record that this client is using this object. In the case
       // where entry == NULL, this will be called from SealObject.
-      AddToClientObjectIds(entry, client);
+      AddToClientObjectIds(object_id, entry, client);
     } else {
       // Add a placeholder plasma object to the get request to indicate that the
       // object is not present. This will be parsed by the client. We set the
@@ -390,8 +390,9 @@ void PlasmaStore::ProcessGetRequest(Client* client,
   }
 }
 
-int PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* client) {
-  auto it = client->object_ids.find(entry->object_id);
+int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,
+                                           ObjectTableEntry* entry, Client* client) {
+  auto it = client->object_ids.find(object_id);
   if (it != client->object_ids.end()) {
     client->object_ids.erase(it);
     // Decrease reference count.
@@ -400,16 +401,16 @@ int PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* clie
     // If no more clients are using this object, notify the eviction policy
     // that the object is no longer being used.
     if (entry->ref_count == 0) {
-      if (deletion_cache_.count(entry->object_id) == 0) {
+      if (deletion_cache_.count(object_id) == 0) {
         // Tell the eviction policy that this object is no longer being used.
         std::vector<ObjectID> objects_to_evict;
-        eviction_policy_.EndObjectAccess(entry->object_id, &objects_to_evict);
+        eviction_policy_.EndObjectAccess(object_id, &objects_to_evict);
         DeleteObjects(objects_to_evict);
       } else {
         // Above code does not really delete an object. Instead, it just put an
         // object to LRU cache which will be cleaned when the memory is not enough.
-        deletion_cache_.erase(entry->object_id);
-        DeleteObjects({entry->object_id});
+        deletion_cache_.erase(object_id);
+        DeleteObjects({object_id});
       }
     }
     // Return 1 to indicate that the client was removed.
@@ -424,7 +425,7 @@ void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) {
   auto entry = GetObjectTableEntry(&store_info_, object_id);
   ARROW_CHECK(entry != nullptr);
   // Remove the client from the object's array of clients.
-  ARROW_CHECK(RemoveFromClientObjectIds(entry, client) == 1);
+  ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
 }
 
 // Check if an object is present.
@@ -554,7 +555,7 @@ void PlasmaStore::DisconnectClient(int client_fd) {
   ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
   // Release all the objects that the client was using.
   auto client = it->second.get();
-  std::vector<ObjectTableEntry*> sealed_objects;
+  std::unordered_map<ObjectID, ObjectTableEntry*> sealed_objects;
   for (const auto& object_id : client->object_ids) {
     auto it = store_info_.objects.find(object_id);
     if (it == store_info_.objects.end()) {
@@ -564,7 +565,7 @@ void PlasmaStore::DisconnectClient(int client_fd) {
     if (it->second->state == ObjectState::PLASMA_SEALED) {
       // Add sealed objects to a temporary list of object IDs. Do not perform
       // the remove here, since it potentially modifies the object_ids table.
-      sealed_objects.push_back(it->second.get());
+      sealed_objects[it->first] = it->second.get();
     } else {
       // Abort unsealed object.
       AbortObject(it->first, client);
@@ -572,7 +573,7 @@ void PlasmaStore::DisconnectClient(int client_fd) {
   }
 
   for (const auto& entry : sealed_objects) {
-    RemoveFromClientObjectIds(entry, client);
+    RemoveFromClientObjectIds(entry.first, entry.second, client);
   }
 
   if (client->notification_fd > 0) {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 40412a8..4b83143 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -180,13 +180,15 @@ class PlasmaStore {
 
   void PushNotification(ObjectInfoT* object_notification, int client_fd);
 
-  void AddToClientObjectIds(ObjectTableEntry* entry, Client* client);
+  void AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+                            Client* client);
 
   void ReturnFromGet(GetRequest* get_req);
 
   void UpdateObjectGetRequests(const ObjectID& object_id);
 
-  int RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* client);
+  int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+                                Client* client);
 
   /// Event loop of the plasma store.
   EventLoop* loop_;