You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/05/15 22:54:34 UTC

[arrow] branch master updated: ARROW-2558: [Plasma] avoid walk through all the objects when a client disconnects

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 38db8ed  ARROW-2558: [Plasma] avoid walk through all the objects when a client disconnects
38db8ed is described below

commit 38db8edeeec07aea5be84829b38f0a9b562eeba8
Author: Zhijun Fu <zh...@outlook.com>
AuthorDate: Tue May 15 15:54:25 2018 -0700

    ARROW-2558: [Plasma] avoid walk through all the objects when a client disconnects
    
    Currently plasma stores list-of-clients in ObjectTableEntry, which is used to track which clients are using a given object, this serves for two purposes:
    - If an object is in use.
    - If the client trying to abort an object is the one who created it.
    
    A problem with list-of-clients approach is that when a client disconnects, we need to walk through all the objects and remove the client pointer from the list for each object.
    
    Instead, we could add a reference count in ObjectTableEntry, and store list-of-object-ids in client structure. This could both goals that the original approach is targeting, while when a client disconnects, it just walk through its object-ids and dereference each ObjectTableEntry, there's no need to walk through all objects.
    
    Author: Zhijun Fu <zh...@outlook.com>
    
    Closes #2015 from zhijunfu/client_object_ids and squashes the following commits:
    
    d8db8f75 <Zhijun Fu> Address comments from pcmoritz
    8a439e88 <Zhijun Fu> Trigger
    a0475725 <Zhijun Fu>  use list-of-object-ids in client instead of list-of-clients in object
---
 cpp/src/plasma/plasma.cc |  2 +-
 cpp/src/plasma/plasma.h  |  5 ++--
 cpp/src/plasma/store.cc  | 75 +++++++++++++++++++++++++++---------------------
 cpp/src/plasma/store.h   |  8 ++++--
 4 files changed, 52 insertions(+), 38 deletions(-)

diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index 60b7c3f..d98cbb9 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -30,7 +30,7 @@ extern "C" {
 void dlfree(void* mem);
 }
 
-ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {}
+ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {}
 
 ObjectTableEntry::~ObjectTableEntry() {
   dlfree(pointer);
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 7a513ea..8673036 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -134,8 +134,9 @@ struct ObjectTableEntry {
   /// IPC GPU handle to share with clients.
   std::shared_ptr<CudaIpcMemHandle> ipc_handle;
 #endif
-  /// Set of clients currently using this object.
-  std::unordered_set<Client*> clients;
+  /// Number of clients currently using this object.
+  int ref_count;
+
   /// The state of the object, e.g., whether it is open or sealed.
   object_state state;
   /// The digest of the object. Used to see if two objects are the same.
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 3c1d5c8..95fbf49 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info
 
 // If this client is not already using the object, add the client to the
 // object's list of clients, otherwise do nothing.
-void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
+void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) {
   // Check if this client is already using the object.
-  if (entry->clients.find(client) != entry->clients.end()) {
+  if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
     return;
   }
   // If there are no other clients using this object, notify the eviction policy
   // that the object is being used.
-  if (entry->clients.size() == 0) {
+  if (entry->ref_count == 0) {
     // Tell the eviction policy that this object is being used.
     std::vector<ObjectID> objects_to_evict;
     eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
     delete_objects(objects_to_evict);
   }
-  // Add the client pointer to the list of clients using this object.
-  entry->clients.insert(client);
+  // Increase reference count.
+  entry->ref_count++;
+
+  // Add object id to the list of object ids that this client is using.
+  client->object_ids.insert(entry->object_id);
 }
 
 // Create a new object buffer in the hash table.
@@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
   result->metadata_size = metadata_size;
   result->device_num = device_num;
   // Notify the eviction policy that this object was created. This must be done
-  // immediately before the call to add_client_to_object_clients so that the
+  // immediately before the call to add_to_client_object_ids so that the
   // eviction policy does not have an opportunity to evict the object.
   eviction_policy_.object_created(object_id);
   // Record that this client is using this object.
-  add_client_to_object_clients(store_info_.objects[object_id].get(), client);
+  add_to_client_object_ids(store_info_.objects[object_id].get(), client);
   return PlasmaError_OK;
 }
 
@@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
     get_req->num_satisfied += 1;
     // Record the fact that this client will be using this object and will
     // be responsible for releasing this object.
-    add_client_to_object_clients(entry, get_req->client);
+    add_to_client_object_ids(entry, get_req->client);
 
     // If this get request is done, reply to the client.
     if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
@@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client,
       get_req->num_satisfied += 1;
       // If necessary, record that this client is using this object. In the case
       // where entry == NULL, this will be called from seal_object.
-      add_client_to_object_clients(entry, client);
+      add_to_client_object_ids(entry, client);
     } else {
       // Add a placeholder plasma object to the get request to indicate that the
       // object is not present. This will be parsed by the client. We set the
@@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client,
   }
 }
 
-int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry,
-                                                   Client* client) {
-  auto it = entry->clients.find(client);
-  if (it != entry->clients.end()) {
-    entry->clients.erase(it);
+int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) {
+  auto it = client->object_ids.find(entry->object_id);
+  if (it != client->object_ids.end()) {
+    client->object_ids.erase(it);
+    // Decrease reference count.
+    entry->ref_count--;
+
     // If no more clients are using this object, notify the eviction policy
     // that the object is no longer being used.
-    if (entry->clients.size() == 0) {
+    if (entry->ref_count == 0) {
       // Tell the eviction policy that this object is no longer being used.
       std::vector<ObjectID> objects_to_evict;
       eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
@@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
   auto entry = get_object_table_entry(&store_info_, object_id);
   ARROW_CHECK(entry != NULL);
   // Remove the client from the object's array of clients.
-  ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1);
+  ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1);
 }
 
 // Check if an object is present.
@@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
   ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
   ARROW_CHECK(entry->state != PLASMA_SEALED)
       << "To abort an object it must not have been sealed.";
-  auto it = entry->clients.find(client);
-  if (it == entry->clients.end()) {
+  auto it = client->object_ids.find(object_id);
+  if (it == client->object_ids.end()) {
     // If the client requesting the abort is not the creator, do not
     // perform the abort.
     return 0;
@@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) {
     return PlasmaError_ObjectNotSealed;
   }
 
-  if (entry->clients.size() != 0) {
+  if (entry->ref_count != 0) {
     // To delete an object, there must be no clients currently using it.
     return PlasmaError_ObjectInUse;
   }
@@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
     ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
     ARROW_CHECK(entry->state == PLASMA_SEALED)
         << "To delete an object it must have been sealed.";
-    ARROW_CHECK(entry->clients.size() == 0)
+    ARROW_CHECK(entry->ref_count == 0)
         << "To delete an object, there must be no clients currently using it.";
     store_info_.objects.erase(object_id);
     // Inform all subscribers that the object has been deleted.
@@ -529,23 +534,27 @@ void PlasmaStore::disconnect_client(int client_fd) {
   // Close the socket.
   close(client_fd);
   ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
-  // If this client was using any objects, remove it from the appropriate
-  // lists.
-  // TODO(swang): Avoid iteration through the object table.
+  // Release all the objects that the client was using.
   auto client = it->second.get();
-  std::vector<ObjectID> unsealed_objects;
-  for (const auto& entry : store_info_.objects) {
-    if (entry.second->state == PLASMA_SEALED) {
-      remove_client_from_object_clients(entry.second.get(), client);
+  std::vector<ObjectTableEntry*> sealed_objects;
+  for (const auto& object_id : client->object_ids) {
+    auto it = store_info_.objects.find(object_id);
+    if (it == store_info_.objects.end()) {
+      continue;
+    }
+
+    if (it->second->state == PLASMA_SEALED) {
+      // Add sealed objects to a temporary list of object IDs. Do not perform
+      // the remove here, since it potentially modifies the object_ids table.
+      sealed_objects.push_back(it->second.get());
     } else {
-      // Add unsealed objects to a temporary list of object IDs. Do not perform
-      // the abort here, since it potentially modifies the object table.
-      unsealed_objects.push_back(entry.first);
+      // Abort unsealed object.
+      abort_object(it->first, client);
     }
   }
-  // If the client was creating any objects, abort them.
-  for (const auto& entry : unsealed_objects) {
-    abort_object(entry, client);
+
+  for (const auto& entry : sealed_objects) {
+    remove_from_client_object_ids(entry, client);
   }
 
   // Note, the store may still attempt to send a message to the disconnected
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index ac6b2c4..fd077f9 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "plasma/common.h"
@@ -46,6 +47,9 @@ struct Client {
 
   /// The file descriptor used to communicate with the client.
   int fd;
+
+  /// Object ids that are used by this client.
+  std::unordered_set<ObjectID, UniqueIDHasher> object_ids;
 };
 
 class PlasmaStore {
@@ -164,13 +168,13 @@ class PlasmaStore {
  private:
   void push_notification(ObjectInfoT* object_notification);
 
-  void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);
+  void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);
 
   void return_from_get(GetRequest* get_req);
 
   void update_object_get_requests(const ObjectID& object_id);
 
-  int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client);
+  int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client);
 
   /// Event loop of the plasma store.
   EventLoop* loop_;

-- 
To stop receiving notification emails like this one, please contact
pcmoritz@apache.org.