You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/01/10 23:32:19 UTC

[arrow] branch master updated: ARROW-1927: [Plasma] Add delete function

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f82b7e4  ARROW-1927: [Plasma] Add delete function
f82b7e4 is described below

commit f82b7e4f57bf6d5aa283823f483fabdda59d56ad
Author: Jin Hai <ha...@gmail.com>
AuthorDate: Wed Jan 10 15:32:11 2018 -0800

    ARROW-1927: [Plasma] Add delete function
    
    Hi, I just add the delete function for Plasma and tested. JIRA ticked:
    https://issues.apache.org/jira/browse/ARROW-1927
    
    Author: Jin Hai <ha...@gmail.com>
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #1427 from JinHai-CN/plasma-delete and squashes the following commits:
    
    c6df5be [Philipp Moritz] rebase
    424c1b7 [Philipp Moritz] fix linting
    1d76437 [Philipp Moritz] fix tests
    0ca115a [Jin Hai] Fixed two bugs according to the comments
    ce27077 [Jin Hai] Update the unit test cases
    8b6804e [Jin Hai] ARROW-1927: [Plasma] Try to fix unit-test fault
    be88990 [Jin Hai] ARROW-1927: [Plasma] Add 3 test cases for delete function
    baf82b9 [Jin Hai] ARROW-1927: [Plasma] Update according to the CI error
    53e24eb [Jin Hai] ARROW-1927: [Plasma] Update according to the comments and CI error
    c9984a4 [Jin Hai] ARROW-1927: [Plasma] Add delete function
---
 cpp/src/plasma/client.cc            | 17 +++++++++++---
 cpp/src/plasma/client.h             |  3 ++-
 cpp/src/plasma/eviction_policy.cc   | 10 ++++++++
 cpp/src/plasma/eviction_policy.h    |  5 ++++
 cpp/src/plasma/format/plasma.fbs    |  6 ++++-
 cpp/src/plasma/store.cc             | 46 +++++++++++++++++++++++++++++++++----
 cpp/src/plasma/store.h              |  9 ++++++++
 cpp/src/plasma/test/client_tests.cc | 25 ++++++++++++++++++++
 8 files changed, 112 insertions(+), 9 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 0dd1c44..d74c0f4 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -513,9 +513,20 @@ Status PlasmaClient::Abort(const ObjectID& object_id) {
 }
 
 Status PlasmaClient::Delete(const ObjectID& object_id) {
-  // TODO(rkn): In the future, we can use this method to give hints to the
-  // eviction policy about when an object will no longer be needed.
-  return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
+  RETURN_NOT_OK(FlushReleaseHistory());
+  // If the object is in used, client can't send the remove message.
+  if (objects_in_use_.count(object_id) > 0) {
+    return Status::UnknownError("PlasmaClient::Object is in use.");
+  } else {
+    // If we don't already have a reference to the object, we can try to remove the object
+    RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
+    std::vector<uint8_t> buffer;
+    RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaDeleteReply, &buffer));
+    ObjectID object_id2;
+    DCHECK_GT(buffer.size(), 0);
+    RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
+    return Status::OK();
+  }
 }
 
 Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 78793f1..35182f8 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -174,7 +174,8 @@ class ARROW_EXPORT PlasmaClient {
   Status Seal(const ObjectID& object_id);
 
   /// Delete an object from the object store. This currently assumes that the
-  /// object is present and has been sealed.
+  /// object is present, has been sealed and not used by another client. Otherwise,
+  /// it is a no operation.
   ///
   /// @todo We may want to allow the deletion of objects that are not present or
   ///       haven't been sealed.
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index a7758fd..66a3b2e 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -102,4 +102,14 @@ void EvictionPolicy::end_object_access(const ObjectID& object_id,
   cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
 }
 
+void EvictionPolicy::remove_object(const ObjectID& object_id) {
+  /* If the object is in the LRU cache, remove it. */
+  cache_.remove(object_id);
+
+  auto entry = store_info_->objects[object_id].get();
+  int64_t size = entry->info.data_size + entry->info.metadata_size;
+  ARROW_CHECK(memory_used_ >= size);
+  memory_used_ -= size;
+}
+
 }  // namespace plasma
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
index cebf35b..b076309 100644
--- a/cpp/src/plasma/eviction_policy.h
+++ b/cpp/src/plasma/eviction_policy.h
@@ -120,6 +120,11 @@ class EvictionPolicy {
   int64_t choose_objects_to_evict(int64_t num_bytes_required,
                                   std::vector<ObjectID>* objects_to_evict);
 
+  /// This method will be called when an object is going to be removed
+  ///
+  /// @param object_id The ID of the object that is now being used.
+  void remove_object(const ObjectID& object_id);
+
  private:
   /// The amount of memory (in bytes) currently being used.
   int64_t memory_used_;
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index b6d03b8..ea6dc8b 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -76,7 +76,11 @@ enum PlasmaError:int {
   // Trying to access an object that doesn't exist.
   ObjectNonexistent,
   // Trying to create an object but there isn't enough space in the store.
-  OutOfMemory
+  OutOfMemory,
+  // Trying to delete an object but it's not sealed.
+  ObjectNotSealed,
+  // Trying to delete an object but it's in use.
+  ObjectInUse
 }
 
 // Plasma store messages
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c6a19a5..dde7f9c 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -411,6 +411,39 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
   }
 }
 
+int PlasmaStore::delete_object(ObjectID& object_id) {
+  auto entry = get_object_table_entry(&store_info_, object_id);
+  // TODO(rkn): This should probably not fail, but should instead throw an
+  // error. Maybe we should also support deleting objects that have been
+  // created but not sealed.
+  if (entry == NULL) {
+    // To delete an object it must be in the object table.
+    return PlasmaError_ObjectNonexistent;
+  }
+
+  if (entry->state != PLASMA_SEALED) {
+    // To delete an object it must have been sealed.
+    return PlasmaError_ObjectNotSealed;
+  }
+
+  if (entry->clients.size() != 0) {
+    // To delete an object, there must be no clients currently using it.
+    return PlasmaError_ObjectInUse;
+  }
+
+  eviction_policy_.remove_object(object_id);
+
+  dlfree(entry->pointer);
+  store_info_.objects.erase(object_id);
+  // Inform all subscribers that the object has been deleted.
+  ObjectInfoT notification;
+  notification.object_id = object_id.binary();
+  notification.is_deletion = true;
+  push_notification(&notification);
+
+  return PlasmaError_OK;
+}
+
 void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
   for (const auto& object_id : object_ids) {
     ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
@@ -626,18 +659,23 @@ Status PlasmaStore::process_message(Client* client) {
       RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
       process_get_request(client, object_ids_to_get, timeout_ms);
     } break;
-    case MessageType_PlasmaReleaseRequest:
+    case MessageType_PlasmaReleaseRequest: {
       RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
       release_object(object_id, client);
-      break;
-    case MessageType_PlasmaContainsRequest:
+    } break;
+    case MessageType_PlasmaDeleteRequest: {
+      RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
+      int error_code = delete_object(object_id);
+      HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd);
+    } break;
+    case MessageType_PlasmaContainsRequest: {
       RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
       if (contains_object(object_id) == OBJECT_FOUND) {
         HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
       } else {
         HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
       }
-      break;
+    } break;
     case MessageType_PlasmaSealRequest: {
       unsigned char digest[kDigestSize];
       RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index a72c625..7eada5a 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -83,6 +83,15 @@ class PlasmaStore {
   /// @return 1 if the abort succeeds, else 0.
   int abort_object(const ObjectID& object_id, Client* client);
 
+  /// Delete an specific object by object_id that have been created in the hash table.
+  ///
+  /// @param object_id Object ID of the object to be deleted.
+  /// @return One of the following error codes:
+  ///  - PlasmaError_OK, if the object was delete successfully.
+  ///  - PlasmaError_ObjectNonexistent, if ths object isn't existed.
+  ///  - PlasmaError_ObjectInUse, if the object is in use.
+  int delete_object(ObjectID& object_id);
+
   /// Delete objects that have been created in the hash table. This should only
   /// be called on objects that are returned by the eviction policy to evict.
   ///
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 5cd3063..f19c2bf 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -58,6 +58,31 @@ class TestPlasmaStore : public ::testing::Test {
   PlasmaClient client2_;
 };
 
+TEST_F(TestPlasmaStore, DeleteTest) {
+  ObjectID object_id = ObjectID::from_random();
+
+  // Test for deleting non-existance object.
+  Status result = client_.Delete(object_id);
+  ASSERT_EQ(result.IsPlasmaObjectNonexistent(), true);
+
+  // Test for the object being in local Plasma store.
+  // First create object.
+  int64_t data_size = 100;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  std::shared_ptr<Buffer> data;
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+  ARROW_CHECK_OK(client_.Seal(object_id));
+
+  // Object is in use, can't be delete.
+  result = client_.Delete(object_id);
+  ASSERT_EQ(result.IsUnknownError(), true);
+
+  // Avoid race condition of Plasma Manager waiting for notification.
+  ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.Delete(object_id));
+}
+
 TEST_F(TestPlasmaStore, ContainsTest) {
   ObjectID object_id = ObjectID::from_random();
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].