You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/01/10 23:32:19 UTC
[arrow] branch master updated: ARROW-1927: [Plasma] Add delete
function
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f82b7e4 ARROW-1927: [Plasma] Add delete function
f82b7e4 is described below
commit f82b7e4f57bf6d5aa283823f483fabdda59d56ad
Author: Jin Hai <ha...@gmail.com>
AuthorDate: Wed Jan 10 15:32:11 2018 -0800
ARROW-1927: [Plasma] Add delete function
Hi, I just add the delete function for Plasma and tested. JIRA ticked:
https://issues.apache.org/jira/browse/ARROW-1927
Author: Jin Hai <ha...@gmail.com>
Author: Philipp Moritz <pc...@gmail.com>
Closes #1427 from JinHai-CN/plasma-delete and squashes the following commits:
c6df5be [Philipp Moritz] rebase
424c1b7 [Philipp Moritz] fix linting
1d76437 [Philipp Moritz] fix tests
0ca115a [Jin Hai] Fixed two bugs according to the comments
ce27077 [Jin Hai] Update the unit test cases
8b6804e [Jin Hai] ARROW-1927: [Plasma] Try to fix unit-test fault
be88990 [Jin Hai] ARROW-1927: [Plasma] Add 3 test cases for delete function
baf82b9 [Jin Hai] ARROW-1927: [Plasma] Update according to the CI error
53e24eb [Jin Hai] ARROW-1927: [Plasma] Update according to the comments and CI error
c9984a4 [Jin Hai] ARROW-1927: [Plasma] Add delete function
---
cpp/src/plasma/client.cc | 17 +++++++++++---
cpp/src/plasma/client.h | 3 ++-
cpp/src/plasma/eviction_policy.cc | 10 ++++++++
cpp/src/plasma/eviction_policy.h | 5 ++++
cpp/src/plasma/format/plasma.fbs | 6 ++++-
cpp/src/plasma/store.cc | 46 +++++++++++++++++++++++++++++++++----
cpp/src/plasma/store.h | 9 ++++++++
cpp/src/plasma/test/client_tests.cc | 25 ++++++++++++++++++++
8 files changed, 112 insertions(+), 9 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 0dd1c44..d74c0f4 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -513,9 +513,20 @@ Status PlasmaClient::Abort(const ObjectID& object_id) {
}
Status PlasmaClient::Delete(const ObjectID& object_id) {
- // TODO(rkn): In the future, we can use this method to give hints to the
- // eviction policy about when an object will no longer be needed.
- return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
+ RETURN_NOT_OK(FlushReleaseHistory());
+ // If the object is in used, client can't send the remove message.
+ if (objects_in_use_.count(object_id) > 0) {
+ return Status::UnknownError("PlasmaClient::Object is in use.");
+ } else {
+ // If we don't already have a reference to the object, we can try to remove the object
+ RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaDeleteReply, &buffer));
+ ObjectID object_id2;
+ DCHECK_GT(buffer.size(), 0);
+ RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
+ return Status::OK();
+ }
}
Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 78793f1..35182f8 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -174,7 +174,8 @@ class ARROW_EXPORT PlasmaClient {
Status Seal(const ObjectID& object_id);
/// Delete an object from the object store. This currently assumes that the
- /// object is present and has been sealed.
+ /// object is present, has been sealed and not used by another client. Otherwise,
+ /// it is a no operation.
///
/// @todo We may want to allow the deletion of objects that are not present or
/// haven't been sealed.
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index a7758fd..66a3b2e 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -102,4 +102,14 @@ void EvictionPolicy::end_object_access(const ObjectID& object_id,
cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
}
+void EvictionPolicy::remove_object(const ObjectID& object_id) {
+ /* If the object is in the LRU cache, remove it. */
+ cache_.remove(object_id);
+
+ auto entry = store_info_->objects[object_id].get();
+ int64_t size = entry->info.data_size + entry->info.metadata_size;
+ ARROW_CHECK(memory_used_ >= size);
+ memory_used_ -= size;
+}
+
} // namespace plasma
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
index cebf35b..b076309 100644
--- a/cpp/src/plasma/eviction_policy.h
+++ b/cpp/src/plasma/eviction_policy.h
@@ -120,6 +120,11 @@ class EvictionPolicy {
int64_t choose_objects_to_evict(int64_t num_bytes_required,
std::vector<ObjectID>* objects_to_evict);
+ /// This method will be called when an object is going to be removed
+ ///
+ /// @param object_id The ID of the object that is now being used.
+ void remove_object(const ObjectID& object_id);
+
private:
/// The amount of memory (in bytes) currently being used.
int64_t memory_used_;
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index b6d03b8..ea6dc8b 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -76,7 +76,11 @@ enum PlasmaError:int {
// Trying to access an object that doesn't exist.
ObjectNonexistent,
// Trying to create an object but there isn't enough space in the store.
- OutOfMemory
+ OutOfMemory,
+ // Trying to delete an object but it's not sealed.
+ ObjectNotSealed,
+ // Trying to delete an object but it's in use.
+ ObjectInUse
}
// Plasma store messages
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c6a19a5..dde7f9c 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -411,6 +411,39 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
}
}
+int PlasmaStore::delete_object(ObjectID& object_id) {
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ // TODO(rkn): This should probably not fail, but should instead throw an
+ // error. Maybe we should also support deleting objects that have been
+ // created but not sealed.
+ if (entry == NULL) {
+ // To delete an object it must be in the object table.
+ return PlasmaError_ObjectNonexistent;
+ }
+
+ if (entry->state != PLASMA_SEALED) {
+ // To delete an object it must have been sealed.
+ return PlasmaError_ObjectNotSealed;
+ }
+
+ if (entry->clients.size() != 0) {
+ // To delete an object, there must be no clients currently using it.
+ return PlasmaError_ObjectInUse;
+ }
+
+ eviction_policy_.remove_object(object_id);
+
+ dlfree(entry->pointer);
+ store_info_.objects.erase(object_id);
+ // Inform all subscribers that the object has been deleted.
+ ObjectInfoT notification;
+ notification.object_id = object_id.binary();
+ notification.is_deletion = true;
+ push_notification(¬ification);
+
+ return PlasmaError_OK;
+}
+
void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
@@ -626,18 +659,23 @@ Status PlasmaStore::process_message(Client* client) {
RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
process_get_request(client, object_ids_to_get, timeout_ms);
} break;
- case MessageType_PlasmaReleaseRequest:
+ case MessageType_PlasmaReleaseRequest: {
RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
release_object(object_id, client);
- break;
- case MessageType_PlasmaContainsRequest:
+ } break;
+ case MessageType_PlasmaDeleteRequest: {
+ RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
+ int error_code = delete_object(object_id);
+ HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd);
+ } break;
+ case MessageType_PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
if (contains_object(object_id) == OBJECT_FOUND) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
} else {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
}
- break;
+ } break;
case MessageType_PlasmaSealRequest: {
unsigned char digest[kDigestSize];
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index a72c625..7eada5a 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -83,6 +83,15 @@ class PlasmaStore {
/// @return 1 if the abort succeeds, else 0.
int abort_object(const ObjectID& object_id, Client* client);
+ /// Delete an specific object by object_id that have been created in the hash table.
+ ///
+ /// @param object_id Object ID of the object to be deleted.
+ /// @return One of the following error codes:
+ /// - PlasmaError_OK, if the object was delete successfully.
+ /// - PlasmaError_ObjectNonexistent, if ths object isn't existed.
+ /// - PlasmaError_ObjectInUse, if the object is in use.
+ int delete_object(ObjectID& object_id);
+
/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
///
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 5cd3063..f19c2bf 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -58,6 +58,31 @@ class TestPlasmaStore : public ::testing::Test {
PlasmaClient client2_;
};
+TEST_F(TestPlasmaStore, DeleteTest) {
+ ObjectID object_id = ObjectID::from_random();
+
+ // Test for deleting non-existance object.
+ Status result = client_.Delete(object_id);
+ ASSERT_EQ(result.IsPlasmaObjectNonexistent(), true);
+
+ // Test for the object being in local Plasma store.
+ // First create object.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ std::shared_ptr<Buffer> data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+ ARROW_CHECK_OK(client_.Seal(object_id));
+
+ // Object is in use, can't be delete.
+ result = client_.Delete(object_id);
+ ASSERT_EQ(result.IsUnknownError(), true);
+
+ // Avoid race condition of Plasma Manager waiting for notification.
+ ARROW_CHECK_OK(client_.Release(object_id));
+ ARROW_CHECK_OK(client_.Delete(object_id));
+}
+
TEST_F(TestPlasmaStore, ContainsTest) {
ObjectID object_id = ObjectID::from_random();
--
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].