You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/05/15 22:54:34 UTC
[arrow] branch master updated: ARROW-2558: [Plasma] avoid walk
through all the objects when a client disconnects
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 38db8ed ARROW-2558: [Plasma] avoid walk through all the objects when a client disconnects
38db8ed is described below
commit 38db8edeeec07aea5be84829b38f0a9b562eeba8
Author: Zhijun Fu <zh...@outlook.com>
AuthorDate: Tue May 15 15:54:25 2018 -0700
ARROW-2558: [Plasma] avoid walk through all the objects when a client disconnects
Currently plasma stores list-of-clients in ObjectTableEntry, which is used to track which clients are using a given object, this serves for two purposes:
- If an object is in use.
- If the client trying to abort an object is the one who created it.
A problem with list-of-clients approach is that when a client disconnects, we need to walk through all the objects and remove the client pointer from the list for each object.
Instead, we could add a reference count in ObjectTableEntry, and store list-of-object-ids in client structure. This could both goals that the original approach is targeting, while when a client disconnects, it just walk through its object-ids and dereference each ObjectTableEntry, there's no need to walk through all objects.
Author: Zhijun Fu <zh...@outlook.com>
Closes #2015 from zhijunfu/client_object_ids and squashes the following commits:
d8db8f75 <Zhijun Fu> Address comments from pcmoritz
8a439e88 <Zhijun Fu> Trigger
a0475725 <Zhijun Fu> use list-of-object-ids in client instead of list-of-clients in object
---
cpp/src/plasma/plasma.cc | 2 +-
cpp/src/plasma/plasma.h | 5 ++--
cpp/src/plasma/store.cc | 75 +++++++++++++++++++++++++++---------------------
cpp/src/plasma/store.h | 8 ++++--
4 files changed, 52 insertions(+), 38 deletions(-)
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index 60b7c3f..d98cbb9 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -30,7 +30,7 @@ extern "C" {
void dlfree(void* mem);
}
-ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {}
+ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {}
ObjectTableEntry::~ObjectTableEntry() {
dlfree(pointer);
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 7a513ea..8673036 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -134,8 +134,9 @@ struct ObjectTableEntry {
/// IPC GPU handle to share with clients.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
#endif
- /// Set of clients currently using this object.
- std::unordered_set<Client*> clients;
+ /// Number of clients currently using this object.
+ int ref_count;
+
/// The state of the object, e.g., whether it is open or sealed.
object_state state;
/// The digest of the object. Used to see if two objects are the same.
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 3c1d5c8..95fbf49 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info
// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
-void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
+void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) {
// Check if this client is already using the object.
- if (entry->clients.find(client) != entry->clients.end()) {
+ if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
return;
}
// If there are no other clients using this object, notify the eviction policy
// that the object is being used.
- if (entry->clients.size() == 0) {
+ if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
std::vector<ObjectID> objects_to_evict;
eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
delete_objects(objects_to_evict);
}
- // Add the client pointer to the list of clients using this object.
- entry->clients.insert(client);
+ // Increase reference count.
+ entry->ref_count++;
+
+ // Add object id to the list of object ids that this client is using.
+ client->object_ids.insert(entry->object_id);
}
// Create a new object buffer in the hash table.
@@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
result->metadata_size = metadata_size;
result->device_num = device_num;
// Notify the eviction policy that this object was created. This must be done
- // immediately before the call to add_client_to_object_clients so that the
+ // immediately before the call to add_to_client_object_ids so that the
// eviction policy does not have an opportunity to evict the object.
eviction_policy_.object_created(object_id);
// Record that this client is using this object.
- add_client_to_object_clients(store_info_.objects[object_id].get(), client);
+ add_to_client_object_ids(store_info_.objects[object_id].get(), client);
return PlasmaError_OK;
}
@@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will
// be responsible for releasing this object.
- add_client_to_object_clients(entry, get_req->client);
+ add_to_client_object_ids(entry, get_req->client);
// If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
@@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client,
get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from seal_object.
- add_client_to_object_clients(entry, client);
+ add_to_client_object_ids(entry, client);
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
@@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client,
}
}
-int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry,
- Client* client) {
- auto it = entry->clients.find(client);
- if (it != entry->clients.end()) {
- entry->clients.erase(it);
+int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) {
+ auto it = client->object_ids.find(entry->object_id);
+ if (it != client->object_ids.end()) {
+ client->object_ids.erase(it);
+ // Decrease reference count.
+ entry->ref_count--;
+
// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
- if (entry->clients.size() == 0) {
+ if (entry->ref_count == 0) {
// Tell the eviction policy that this object is no longer being used.
std::vector<ObjectID> objects_to_evict;
eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
@@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
auto entry = get_object_table_entry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
// Remove the client from the object's array of clients.
- ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1);
+ ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1);
}
// Check if an object is present.
@@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
ARROW_CHECK(entry->state != PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
- auto it = entry->clients.find(client);
- if (it == entry->clients.end()) {
+ auto it = client->object_ids.find(object_id);
+ if (it == client->object_ids.end()) {
// If the client requesting the abort is not the creator, do not
// perform the abort.
return 0;
@@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) {
return PlasmaError_ObjectNotSealed;
}
- if (entry->clients.size() != 0) {
+ if (entry->ref_count != 0) {
// To delete an object, there must be no clients currently using it.
return PlasmaError_ObjectInUse;
}
@@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
ARROW_CHECK(entry->state == PLASMA_SEALED)
<< "To delete an object it must have been sealed.";
- ARROW_CHECK(entry->clients.size() == 0)
+ ARROW_CHECK(entry->ref_count == 0)
<< "To delete an object, there must be no clients currently using it.";
store_info_.objects.erase(object_id);
// Inform all subscribers that the object has been deleted.
@@ -529,23 +534,27 @@ void PlasmaStore::disconnect_client(int client_fd) {
// Close the socket.
close(client_fd);
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
- // If this client was using any objects, remove it from the appropriate
- // lists.
- // TODO(swang): Avoid iteration through the object table.
+ // Release all the objects that the client was using.
auto client = it->second.get();
- std::vector<ObjectID> unsealed_objects;
- for (const auto& entry : store_info_.objects) {
- if (entry.second->state == PLASMA_SEALED) {
- remove_client_from_object_clients(entry.second.get(), client);
+ std::vector<ObjectTableEntry*> sealed_objects;
+ for (const auto& object_id : client->object_ids) {
+ auto it = store_info_.objects.find(object_id);
+ if (it == store_info_.objects.end()) {
+ continue;
+ }
+
+ if (it->second->state == PLASMA_SEALED) {
+ // Add sealed objects to a temporary list of object IDs. Do not perform
+ // the remove here, since it potentially modifies the object_ids table.
+ sealed_objects.push_back(it->second.get());
} else {
- // Add unsealed objects to a temporary list of object IDs. Do not perform
- // the abort here, since it potentially modifies the object table.
- unsealed_objects.push_back(entry.first);
+ // Abort unsealed object.
+ abort_object(it->first, client);
}
}
- // If the client was creating any objects, abort them.
- for (const auto& entry : unsealed_objects) {
- abort_object(entry, client);
+
+ for (const auto& entry : sealed_objects) {
+ remove_from_client_object_ids(entry, client);
}
// Note, the store may still attempt to send a message to the disconnected
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index ac6b2c4..fd077f9 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "plasma/common.h"
@@ -46,6 +47,9 @@ struct Client {
/// The file descriptor used to communicate with the client.
int fd;
+
+ /// Object ids that are used by this client.
+ std::unordered_set<ObjectID, UniqueIDHasher> object_ids;
};
class PlasmaStore {
@@ -164,13 +168,13 @@ class PlasmaStore {
private:
void push_notification(ObjectInfoT* object_notification);
- void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);
+ void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);
void return_from_get(GetRequest* get_req);
void update_object_get_requests(const ObjectID& object_id);
- int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client);
+ int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client);
/// Event loop of the plasma store.
EventLoop* loop_;
--
To stop receiving notification emails like this one, please contact
pcmoritz@apache.org.