You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/07/04 23:26:14 UTC

[arrow] branch master updated: ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 037c156  ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma
037c156 is described below

commit 037c156f16332904d4cac8974195aea2861f6c29
Author: Yuhong Guo <yu...@antfin.com>
AuthorDate: Wed Jul 4 16:26:05 2018 -0700

    ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma
    
    This pull request includes following changes:
    1. Add a RPC to delete a list of objects, which could be used in Garbage Collection in Ray according to the [Garbage Collection Discussion](https://github.com/ray-project/ray/issues/2242#issuecomment-398450187)..
    2. Fix a bug in ReadDeleteRequest, change the wrong message type of PlasmaReleaseReply to PlasmaDeleteRequest.
    
    Author: Yuhong Guo <yu...@antfin.com>
    
    Closes #2174 from guoyuhong/addDeleteObjs and squashes the following commits:
    
    793bc5ba <Yuhong Guo> Change according to comment.
    c8562f64 <Yuhong Guo> Trigger build.
    5f3aafe4 <Yuhong Guo> Change back Delete call.
    7db64131 <Yuhong Guo> Add back Delete function for single obj for client
    d67c830e <Yuhong Guo> Fix warning failure.
    a1655fb1 <Yuhong Guo> Delete a comma.
    bc2efc28 <Yuhong Guo> Change according to comment
    3a3b7862 <Yuhong Guo> Fix warning error.
    c67fb4de <Yuhong Guo> Fix a bug in ReadDeleteRequest
    d3188fb9 <Yuhong Guo> Add Delete Object list RPC to support garbage colletion in Ray
---
 cpp/src/plasma/client.cc                   | 33 ++++++++++++-------
 cpp/src/plasma/client.h                    |  8 +++++
 cpp/src/plasma/format/plasma.fbs           | 10 ++++--
 cpp/src/plasma/protocol.cc                 | 53 +++++++++++++++++++++++-------
 cpp/src/plasma/protocol.h                  | 12 ++++---
 cpp/src/plasma/store.cc                    | 11 +++++--
 cpp/src/plasma/test/client_tests.cc        | 33 +++++++++++++++++--
 cpp/src/plasma/test/serialization_tests.cc | 24 +++++++++-----
 8 files changed, 137 insertions(+), 47 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 8e66cf4..95da089 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -182,7 +182,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   Status Seal(const ObjectID& object_id);
 
-  Status Delete(const ObjectID& object_id);
+  Status Delete(const std::vector<ObjectID>& object_ids);
 
   Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
 
@@ -808,21 +808,26 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
   return ReadAbortReply(buffer.data(), buffer.size(), &id);
 }
 
-Status PlasmaClient::Impl::Delete(const ObjectID& object_id) {
+Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
   RETURN_NOT_OK(FlushReleaseHistory());
-  // If the object is in used, client can't send the remove message.
-  if (objects_in_use_.count(object_id) > 0) {
-    return Status::UnknownError("PlasmaClient::Object is in use.");
-  } else {
-    // If we don't already have a reference to the object, we can try to remove the object
-    RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
+  std::vector<ObjectID> not_in_use_ids;
+  for (auto& object_id : object_ids) {
+    // If the object is in used, skip it.
+    if (objects_in_use_.count(object_id) == 0) {
+      not_in_use_ids.push_back(object_id);
+    }
+  }
+  if (not_in_use_ids.size() > 0) {
+    RETURN_NOT_OK(SendDeleteRequest(store_conn_, not_in_use_ids));
     std::vector<uint8_t> buffer;
     RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaDeleteReply, &buffer));
-    ObjectID object_id2;
     DCHECK_GT(buffer.size(), 0);
-    RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
-    return Status::OK();
+    std::vector<PlasmaError> error_codes;
+    not_in_use_ids.clear();
+    RETURN_NOT_OK(
+        ReadDeleteReply(buffer.data(), buffer.size(), &not_in_use_ids, &error_codes));
   }
+  return Status::OK();
 }
 
 Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
@@ -1036,7 +1041,11 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(obje
 Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }
 
 Status PlasmaClient::Delete(const ObjectID& object_id) {
-  return impl_->Delete(object_id);
+  return impl_->Delete(std::vector<ObjectID>{object_id});
+}
+
+Status PlasmaClient::Delete(const std::vector<ObjectID>& object_ids) {
+  return impl_->Delete(object_ids);
 }
 
 Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 5501488..fe00193 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -179,6 +179,14 @@ class ARROW_EXPORT PlasmaClient {
   /// \return The return status.
   Status Delete(const ObjectID& object_id);
 
+  /// Delete a list of objects from the object store. This currently assumes that the
+  /// object is present, has been sealed and not used by another client. Otherwise,
+  /// it is a no operation.
+  ///
+  /// \param object_ids The list of IDs of the objects to delete.
+  /// \return The return status. If all the objects are non-existent, return OK.
+  Status Delete(const std::vector<ObjectID>& object_ids);
+
   /// Delete objects until we have freed up num_bytes bytes or there are no more
   /// released objects that can be deleted.
   ///
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 0bc7a07..333e7d7 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -200,15 +200,19 @@ table PlasmaReleaseReply {
 }
 
 table PlasmaDeleteRequest {
+  // The number of objects to delete.
+  count: int;
   // ID of the object to be deleted.
-  object_id: string;
+  object_ids: [string];
 }
 
 table PlasmaDeleteReply {
+  // The number of objects to delete.
+  count: int;
   // ID of the object that was deleted.
-  object_id: string;
+  object_ids: [string];
   // Error code.
-  error: PlasmaError;
+  errors: [PlasmaError];
 }
 
 table PlasmaStatusRequest {
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 0503e10..0d55505 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -50,6 +50,18 @@ Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* b
   return Status::OK();
 }
 
+// Helper function to create a vector of elements from Data (Request/Reply struct).
+// The Getter function is used to extract one element from Data.
+template <typename T, typename Data, typename Getter>
+void to_vector(const Data& request, std::vector<T>* out, const Getter& getter) {
+  int count = request.count();
+  out->clear();
+  out->reserve(count);
+  for (int i = 0; i < count; ++i) {
+    out->push_back(getter(request, i));
+  }
+}
+
 template <typename Message>
 Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBuilder* fbb,
                   const Message& message) {
@@ -230,35 +242,52 @@ Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
   return plasma_error_status(message->error());
 }
 
-// Delete messages.
+// Delete objects messages.
 
-Status SendDeleteRequest(int sock, ObjectID object_id) {
+Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
   flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
+  auto message =
+      CreatePlasmaDeleteRequest(fbb, static_cast<int32_t>(object_ids.size()),
+                                to_flatbuffer(&fbb, &object_ids[0], object_ids.size()));
   return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message);
 }
 
-Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
+Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids) {
   DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  DCHECK(object_ids);
+  auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
   DCHECK(verify_flatbuffer(message, data, size));
-  *object_id = ObjectID::from_binary(message->object_id()->str());
+  to_vector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
+    return ObjectID::from_binary(request.object_ids()->Get(i)->str());
+  });
   return Status::OK();
 }
 
-Status SendDeleteReply(int sock, ObjectID object_id, PlasmaError error) {
+Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
+                       const std::vector<PlasmaError>& errors) {
+  DCHECK(object_ids.size() == errors.size());
   flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      CreatePlasmaDeleteReply(fbb, fbb.CreateString(object_id.binary()), error);
+  auto message = CreatePlasmaDeleteReply(
+      fbb, static_cast<int32_t>(object_ids.size()),
+      to_flatbuffer(&fbb, &object_ids[0], object_ids.size()),
+      fbb.CreateVector(reinterpret_cast<const int32_t*>(&errors[0]), object_ids.size()));
   return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message);
 }
 
-Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
+Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
+                       std::vector<PlasmaError>* errors) {
   DCHECK(data);
+  DCHECK(object_ids);
+  DCHECK(errors);
   auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
   DCHECK(verify_flatbuffer(message, data, size));
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return plasma_error_status(message->error());
+  to_vector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
+    return ObjectID::from_binary(request.object_ids()->Get(i)->str());
+  });
+  to_vector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
+    return static_cast<PlasmaError>(request.errors()->data()[i]);
+  });
+  return Status::OK();
 }
 
 // Satus messages.
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 2b477a8..1e34343 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -100,15 +100,17 @@ Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error);
 
 Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id);
 
-/* Plasma Delete message functions. */
+/* Plasma Delete objects message functions. */
 
-Status SendDeleteRequest(int sock, ObjectID object_id);
+Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids);
 
-Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id);
+Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids);
 
-Status SendDeleteReply(int sock, ObjectID object_id, PlasmaError error);
+Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
+                       const std::vector<PlasmaError>& errors);
 
-Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id);
+Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
+                       std::vector<PlasmaError>* errors);
 
 /* Satus messages. */
 
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index e86db21..c8bf466 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -742,9 +742,14 @@ Status PlasmaStore::process_message(Client* client) {
       release_object(object_id, client);
     } break;
     case MessageType::PlasmaDeleteRequest: {
-      RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
-      PlasmaError error_code = delete_object(object_id);
-      HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd);
+      std::vector<ObjectID> object_ids;
+      std::vector<PlasmaError> error_codes;
+      RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
+      error_codes.reserve(object_ids.size());
+      for (auto& object_id : object_ids) {
+        error_codes.push_back(delete_object(object_id));
+      }
+      HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd);
     } break;
     case MessageType::PlasmaContainsRequest: {
       RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index fa7de04..e40f6d9 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -165,7 +165,7 @@ TEST_F(TestPlasmaStore, DeleteTest) {
 
   // Test for deleting non-existance object.
   Status result = client_.Delete(object_id);
-  ASSERT_TRUE(result.IsPlasmaObjectNonexistent());
+  ARROW_CHECK_OK(result);
 
   // Test for the object being in local Plasma store.
   // First create object.
@@ -176,15 +176,42 @@ TEST_F(TestPlasmaStore, DeleteTest) {
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
   ARROW_CHECK_OK(client_.Seal(object_id));
 
-  // Object is in use, can't be delete.
   result = client_.Delete(object_id);
-  ASSERT_TRUE(result.IsUnknownError());
+  // TODO: Guarantee that the in-use object will be deleted when it is released.
+  ARROW_CHECK_OK(result);
 
   // Avoid race condition of Plasma Manager waiting for notification.
   ARROW_CHECK_OK(client_.Release(object_id));
   ARROW_CHECK_OK(client_.Delete(object_id));
 }
 
+TEST_F(TestPlasmaStore, DeleteObjectsTest) {
+  ObjectID object_id1 = ObjectID::from_random();
+  ObjectID object_id2 = ObjectID::from_random();
+
+  // Test for deleting non-existance object.
+  Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  ARROW_CHECK_OK(result);
+  // Test for the object being in local Plasma store.
+  // First create object.
+  int64_t data_size = 100;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  std::shared_ptr<Buffer> data;
+  ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
+  ARROW_CHECK_OK(client_.Seal(object_id1));
+  ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
+  ARROW_CHECK_OK(client_.Seal(object_id2));
+  // Objects are in use.
+  result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  // TODO: Guarantee that the in-use object will be deleted when it is released.
+  ARROW_CHECK_OK(result);
+  // Avoid race condition of Plasma Manager waiting for notification.
+  ARROW_CHECK_OK(client_.Release(object_id1));
+  ARROW_CHECK_OK(client_.Release(object_id2));
+  ARROW_CHECK_OK(client_.Delete(std::vector<ObjectID>{object_id1, object_id2}));
+}
+
 TEST_F(TestPlasmaStore, ContainsTest) {
   ObjectID object_id = ObjectID::from_random();
 
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index e05c1b4..1b445f2 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -223,12 +223,13 @@ TEST(PlasmaSerialization, ReleaseReply) {
 TEST(PlasmaSerialization, DeleteRequest) {
   int fd = create_temp_file();
   ObjectID object_id1 = ObjectID::from_random();
-  ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1));
+  ARROW_CHECK_OK(SendDeleteRequest(fd, std::vector<ObjectID>{object_id1}));
   std::vector<uint8_t> data =
       read_message_from_file(fd, MessageType::PlasmaDeleteRequest);
-  ObjectID object_id2;
-  ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_id2));
-  ASSERT_EQ(object_id1, object_id2);
+  std::vector<ObjectID> object_vec;
+  ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_vec));
+  ASSERT_EQ(object_vec.size(), 1);
+  ASSERT_EQ(object_id1, object_vec[0]);
   close(fd);
 }
 
@@ -236,12 +237,17 @@ TEST(PlasmaSerialization, DeleteReply) {
   int fd = create_temp_file();
   ObjectID object_id1 = ObjectID::from_random();
   PlasmaError error1 = PlasmaError::ObjectExists;
-  ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1));
+  ARROW_CHECK_OK(SendDeleteReply(fd, std::vector<ObjectID>{object_id1},
+                                 std::vector<PlasmaError>{error1}));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaDeleteReply);
-  ObjectID object_id2;
-  Status s = ReadDeleteReply(data.data(), data.size(), &object_id2);
-  ASSERT_EQ(object_id1, object_id2);
-  ASSERT_TRUE(s.IsPlasmaObjectExists());
+  std::vector<ObjectID> object_vec;
+  std::vector<PlasmaError> error_vec;
+  Status s = ReadDeleteReply(data.data(), data.size(), &object_vec, &error_vec);
+  ASSERT_EQ(object_vec.size(), 1);
+  ASSERT_EQ(object_id1, object_vec[0]);
+  ASSERT_EQ(error_vec.size(), 1);
+  ASSERT_TRUE(error_vec[0] == PlasmaError::ObjectExists);
+  ASSERT_TRUE(s.ok());
   close(fd);
 }