You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/01 13:56:04 UTC
[arrow] branch master updated: ARROW-2954: [Plasma] Reduce plasma
store memory usage
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3b24bc2 ARROW-2954: [Plasma] Reduce plasma store memory usage
3b24bc2 is described below
commit 3b24bc272af7ee46a7d9f125169bab20873f3dc8
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Wed Aug 1 09:55:58 2018 -0400
ARROW-2954: [Plasma] Reduce plasma store memory usage
Do not store the object id in the object table (it is already stored as the key in the hash map).
Author: Philipp Moritz <pc...@gmail.com>
Closes #2353 from pcmoritz/reduce-memory-usage and squashes the following commits:
7853f4af <Philipp Moritz> fix
33d1992b <Philipp Moritz> fix linting
776b52bf <Philipp Moritz> reduce plasma store memory usage
---
cpp/src/plasma/plasma.h | 2 --
cpp/src/plasma/store.cc | 37 +++++++++++++++++++------------------
cpp/src/plasma/store.h | 6 ++++--
3 files changed, 23 insertions(+), 22 deletions(-)
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 6e15807..11c2b2b 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -116,8 +116,6 @@ struct ObjectTableEntry {
~ObjectTableEntry();
- /// Object id of this object.
- ObjectID object_id;
/// Object info like size, creation time and owner.
flatbuf::ObjectInfoT info;
/// Memory mapped file containing the object.
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index f55f3c9..d68f6e3 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -125,9 +125,10 @@ const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_;
// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
-void PlasmaStore::AddToClientObjectIds(ObjectTableEntry* entry, Client* client) {
+void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+ Client* client) {
// Check if this client is already using the object.
- if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
+ if (client->object_ids.find(object_id) != client->object_ids.end()) {
return;
}
// If there are no other clients using this object, notify the eviction policy
@@ -135,14 +136,14 @@ void PlasmaStore::AddToClientObjectIds(ObjectTableEntry* entry, Client* client)
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
std::vector<ObjectID> objects_to_evict;
- eviction_policy_.BeginObjectAccess(entry->object_id, &objects_to_evict);
+ eviction_policy_.BeginObjectAccess(object_id, &objects_to_evict);
DeleteObjects(objects_to_evict);
}
// Increase reference count.
entry->ref_count++;
// Add object id to the list of object ids that this client is using.
- client->object_ids.insert(entry->object_id);
+ client->object_ids.insert(object_id);
}
// Create a new object buffer in the hash table.
@@ -204,7 +205,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
assert(fd != -1);
}
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
- entry->object_id = object_id;
entry->info.object_id = object_id.binary();
entry->info.data_size = data_size;
entry->info.metadata_size = metadata_size;
@@ -233,7 +233,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
// eviction policy does not have an opportunity to evict the object.
eviction_policy_.ObjectCreated(object_id);
// Record that this client is using this object.
- AddToClientObjectIds(store_info_.objects[object_id].get(), client);
+ AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
return PlasmaError::OK;
}
@@ -331,7 +331,7 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will
// be responsible for releasing this object.
- AddToClientObjectIds(entry, get_req->client);
+ AddToClientObjectIds(object_id, entry, get_req->client);
// If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
@@ -365,7 +365,7 @@ void PlasmaStore::ProcessGetRequest(Client* client,
get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from SealObject.
- AddToClientObjectIds(entry, client);
+ AddToClientObjectIds(object_id, entry, client);
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
@@ -390,8 +390,9 @@ void PlasmaStore::ProcessGetRequest(Client* client,
}
}
-int PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* client) {
- auto it = client->object_ids.find(entry->object_id);
+int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,
+ ObjectTableEntry* entry, Client* client) {
+ auto it = client->object_ids.find(object_id);
if (it != client->object_ids.end()) {
client->object_ids.erase(it);
// Decrease reference count.
@@ -400,16 +401,16 @@ int PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* clie
// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
if (entry->ref_count == 0) {
- if (deletion_cache_.count(entry->object_id) == 0) {
+ if (deletion_cache_.count(object_id) == 0) {
// Tell the eviction policy that this object is no longer being used.
std::vector<ObjectID> objects_to_evict;
- eviction_policy_.EndObjectAccess(entry->object_id, &objects_to_evict);
+ eviction_policy_.EndObjectAccess(object_id, &objects_to_evict);
DeleteObjects(objects_to_evict);
} else {
// Above code does not really delete an object. Instead, it just put an
// object to LRU cache which will be cleaned when the memory is not enough.
- deletion_cache_.erase(entry->object_id);
- DeleteObjects({entry->object_id});
+ deletion_cache_.erase(object_id);
+ DeleteObjects({object_id});
}
}
// Return 1 to indicate that the client was removed.
@@ -424,7 +425,7 @@ void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
// Remove the client from the object's array of clients.
- ARROW_CHECK(RemoveFromClientObjectIds(entry, client) == 1);
+ ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
}
// Check if an object is present.
@@ -554,7 +555,7 @@ void PlasmaStore::DisconnectClient(int client_fd) {
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// Release all the objects that the client was using.
auto client = it->second.get();
- std::vector<ObjectTableEntry*> sealed_objects;
+ std::unordered_map<ObjectID, ObjectTableEntry*> sealed_objects;
for (const auto& object_id : client->object_ids) {
auto it = store_info_.objects.find(object_id);
if (it == store_info_.objects.end()) {
@@ -564,7 +565,7 @@ void PlasmaStore::DisconnectClient(int client_fd) {
if (it->second->state == ObjectState::PLASMA_SEALED) {
// Add sealed objects to a temporary list of object IDs. Do not perform
// the remove here, since it potentially modifies the object_ids table.
- sealed_objects.push_back(it->second.get());
+ sealed_objects[it->first] = it->second.get();
} else {
// Abort unsealed object.
AbortObject(it->first, client);
@@ -572,7 +573,7 @@ void PlasmaStore::DisconnectClient(int client_fd) {
}
for (const auto& entry : sealed_objects) {
- RemoveFromClientObjectIds(entry, client);
+ RemoveFromClientObjectIds(entry.first, entry.second, client);
}
if (client->notification_fd > 0) {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 40412a8..4b83143 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -180,13 +180,15 @@ class PlasmaStore {
void PushNotification(ObjectInfoT* object_notification, int client_fd);
- void AddToClientObjectIds(ObjectTableEntry* entry, Client* client);
+ void AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+ Client* client);
void ReturnFromGet(GetRequest* get_req);
void UpdateObjectGetRequests(const ObjectID& object_id);
- int RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* client);
+ int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+ Client* client);
/// Event loop of the plasma store.
EventLoop* loop_;