You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/07/04 23:26:14 UTC
[arrow] branch master updated: ARROW-2794: [Plasma] Add the RPC of
a list of Delete Objects in Plasma
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 037c156 ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma
037c156 is described below
commit 037c156f16332904d4cac8974195aea2861f6c29
Author: Yuhong Guo <yu...@antfin.com>
AuthorDate: Wed Jul 4 16:26:05 2018 -0700
ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma
This pull request includes following changes:
1. Add a RPC to delete a list of objects, which could be used in Garbage Collection in Ray according to the [Garbage Collection Discussion](https://github.com/ray-project/ray/issues/2242#issuecomment-398450187)..
2. Fix a bug in ReadDeleteRequest, change the wrong message type of PlasmaReleaseReply to PlasmaDeleteRequest.
Author: Yuhong Guo <yu...@antfin.com>
Closes #2174 from guoyuhong/addDeleteObjs and squashes the following commits:
793bc5ba <Yuhong Guo> Change according to comment.
c8562f64 <Yuhong Guo> Trigger build.
5f3aafe4 <Yuhong Guo> Change back Delete call.
7db64131 <Yuhong Guo> Add back Delete function for single obj for client
d67c830e <Yuhong Guo> Fix warning failure.
a1655fb1 <Yuhong Guo> Delete a comma.
bc2efc28 <Yuhong Guo> Change according to comment
3a3b7862 <Yuhong Guo> Fix warning error.
c67fb4de <Yuhong Guo> Fix a bug in ReadDeleteRequest
d3188fb9 <Yuhong Guo> Add Delete Object list RPC to support garbage colletion in Ray
---
cpp/src/plasma/client.cc | 33 ++++++++++++-------
cpp/src/plasma/client.h | 8 +++++
cpp/src/plasma/format/plasma.fbs | 10 ++++--
cpp/src/plasma/protocol.cc | 53 +++++++++++++++++++++++-------
cpp/src/plasma/protocol.h | 12 ++++---
cpp/src/plasma/store.cc | 11 +++++--
cpp/src/plasma/test/client_tests.cc | 33 +++++++++++++++++--
cpp/src/plasma/test/serialization_tests.cc | 24 +++++++++-----
8 files changed, 137 insertions(+), 47 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 8e66cf4..95da089 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -182,7 +182,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status Seal(const ObjectID& object_id);
- Status Delete(const ObjectID& object_id);
+ Status Delete(const std::vector<ObjectID>& object_ids);
Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
@@ -808,21 +808,26 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
return ReadAbortReply(buffer.data(), buffer.size(), &id);
}
-Status PlasmaClient::Impl::Delete(const ObjectID& object_id) {
+Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
RETURN_NOT_OK(FlushReleaseHistory());
- // If the object is in used, client can't send the remove message.
- if (objects_in_use_.count(object_id) > 0) {
- return Status::UnknownError("PlasmaClient::Object is in use.");
- } else {
- // If we don't already have a reference to the object, we can try to remove the object
- RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
+ std::vector<ObjectID> not_in_use_ids;
+ for (auto& object_id : object_ids) {
+ // If the object is in used, skip it.
+ if (objects_in_use_.count(object_id) == 0) {
+ not_in_use_ids.push_back(object_id);
+ }
+ }
+ if (not_in_use_ids.size() > 0) {
+ RETURN_NOT_OK(SendDeleteRequest(store_conn_, not_in_use_ids));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaDeleteReply, &buffer));
- ObjectID object_id2;
DCHECK_GT(buffer.size(), 0);
- RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
- return Status::OK();
+ std::vector<PlasmaError> error_codes;
+ not_in_use_ids.clear();
+ RETURN_NOT_OK(
+ ReadDeleteReply(buffer.data(), buffer.size(), ¬_in_use_ids, &error_codes));
}
+ return Status::OK();
}
Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
@@ -1036,7 +1041,11 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(obje
Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }
Status PlasmaClient::Delete(const ObjectID& object_id) {
- return impl_->Delete(object_id);
+ return impl_->Delete(std::vector<ObjectID>{object_id});
+}
+
+Status PlasmaClient::Delete(const std::vector<ObjectID>& object_ids) {
+ return impl_->Delete(object_ids);
}
Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 5501488..fe00193 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -179,6 +179,14 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Delete(const ObjectID& object_id);
+ /// Delete a list of objects from the object store. This currently assumes that the
+ /// object is present, has been sealed and not used by another client. Otherwise,
+ /// it is a no operation.
+ ///
+ /// \param object_ids The list of IDs of the objects to delete.
+ /// \return The return status. If all the objects are non-existent, return OK.
+ Status Delete(const std::vector<ObjectID>& object_ids);
+
/// Delete objects until we have freed up num_bytes bytes or there are no more
/// released objects that can be deleted.
///
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 0bc7a07..333e7d7 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -200,15 +200,19 @@ table PlasmaReleaseReply {
}
table PlasmaDeleteRequest {
+ // The number of objects to delete.
+ count: int;
// ID of the object to be deleted.
- object_id: string;
+ object_ids: [string];
}
table PlasmaDeleteReply {
+ // The number of objects to delete.
+ count: int;
// ID of the object that was deleted.
- object_id: string;
+ object_ids: [string];
// Error code.
- error: PlasmaError;
+ errors: [PlasmaError];
}
table PlasmaStatusRequest {
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 0503e10..0d55505 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -50,6 +50,18 @@ Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* b
return Status::OK();
}
+// Helper function to create a vector of elements from Data (Request/Reply struct).
+// The Getter function is used to extract one element from Data.
+template <typename T, typename Data, typename Getter>
+void to_vector(const Data& request, std::vector<T>* out, const Getter& getter) {
+ int count = request.count();
+ out->clear();
+ out->reserve(count);
+ for (int i = 0; i < count; ++i) {
+ out->push_back(getter(request, i));
+ }
+}
+
template <typename Message>
Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBuilder* fbb,
const Message& message) {
@@ -230,35 +242,52 @@ Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
return plasma_error_status(message->error());
}
-// Delete messages.
+// Delete objects messages.
-Status SendDeleteRequest(int sock, ObjectID object_id) {
+Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
+ auto message =
+ CreatePlasmaDeleteRequest(fbb, static_cast<int32_t>(object_ids.size()),
+ to_flatbuffer(&fbb, &object_ids[0], object_ids.size()));
return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message);
}
-Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
+Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+ DCHECK(object_ids);
+ auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
- *object_id = ObjectID::from_binary(message->object_id()->str());
+ to_vector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
+ return ObjectID::from_binary(request.object_ids()->Get(i)->str());
+ });
return Status::OK();
}
-Status SendDeleteReply(int sock, ObjectID object_id, PlasmaError error) {
+Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
+ const std::vector<PlasmaError>& errors) {
+ DCHECK(object_ids.size() == errors.size());
flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaDeleteReply(fbb, fbb.CreateString(object_id.binary()), error);
+ auto message = CreatePlasmaDeleteReply(
+ fbb, static_cast<int32_t>(object_ids.size()),
+ to_flatbuffer(&fbb, &object_ids[0], object_ids.size()),
+ fbb.CreateVector(reinterpret_cast<const int32_t*>(&errors[0]), object_ids.size()));
return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message);
}
-Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
+Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
+ std::vector<PlasmaError>* errors) {
DCHECK(data);
+ DCHECK(object_ids);
+ DCHECK(errors);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return plasma_error_status(message->error());
+ to_vector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
+ return ObjectID::from_binary(request.object_ids()->Get(i)->str());
+ });
+ to_vector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
+ return static_cast<PlasmaError>(request.errors()->data()[i]);
+ });
+ return Status::OK();
}
// Satus messages.
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 2b477a8..1e34343 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -100,15 +100,17 @@ Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error);
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id);
-/* Plasma Delete message functions. */
+/* Plasma Delete objects message functions. */
-Status SendDeleteRequest(int sock, ObjectID object_id);
+Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids);
-Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id);
+Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids);
-Status SendDeleteReply(int sock, ObjectID object_id, PlasmaError error);
+Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
+ const std::vector<PlasmaError>& errors);
-Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id);
+Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
+ std::vector<PlasmaError>* errors);
/* Satus messages. */
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index e86db21..c8bf466 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -742,9 +742,14 @@ Status PlasmaStore::process_message(Client* client) {
release_object(object_id, client);
} break;
case MessageType::PlasmaDeleteRequest: {
- RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
- PlasmaError error_code = delete_object(object_id);
- HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd);
+ std::vector<ObjectID> object_ids;
+ std::vector<PlasmaError> error_codes;
+ RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
+ error_codes.reserve(object_ids.size());
+ for (auto& object_id : object_ids) {
+ error_codes.push_back(delete_object(object_id));
+ }
+ HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd);
} break;
case MessageType::PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index fa7de04..e40f6d9 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -165,7 +165,7 @@ TEST_F(TestPlasmaStore, DeleteTest) {
// Test for deleting non-existance object.
Status result = client_.Delete(object_id);
- ASSERT_TRUE(result.IsPlasmaObjectNonexistent());
+ ARROW_CHECK_OK(result);
// Test for the object being in local Plasma store.
// First create object.
@@ -176,15 +176,42 @@ TEST_F(TestPlasmaStore, DeleteTest) {
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));
- // Object is in use, can't be delete.
result = client_.Delete(object_id);
- ASSERT_TRUE(result.IsUnknownError());
+ // TODO: Guarantee that the in-use object will be deleted when it is released.
+ ARROW_CHECK_OK(result);
// Avoid race condition of Plasma Manager waiting for notification.
ARROW_CHECK_OK(client_.Release(object_id));
ARROW_CHECK_OK(client_.Delete(object_id));
}
+TEST_F(TestPlasmaStore, DeleteObjectsTest) {
+ ObjectID object_id1 = ObjectID::from_random();
+ ObjectID object_id2 = ObjectID::from_random();
+
+ // Test for deleting non-existance object.
+ Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+ ARROW_CHECK_OK(result);
+ // Test for the object being in local Plasma store.
+ // First create object.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ std::shared_ptr<Buffer> data;
+ ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
+ ARROW_CHECK_OK(client_.Seal(object_id1));
+ ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
+ ARROW_CHECK_OK(client_.Seal(object_id2));
+ // Objects are in use.
+ result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+ // TODO: Guarantee that the in-use object will be deleted when it is released.
+ ARROW_CHECK_OK(result);
+ // Avoid race condition of Plasma Manager waiting for notification.
+ ARROW_CHECK_OK(client_.Release(object_id1));
+ ARROW_CHECK_OK(client_.Release(object_id2));
+ ARROW_CHECK_OK(client_.Delete(std::vector<ObjectID>{object_id1, object_id2}));
+}
+
TEST_F(TestPlasmaStore, ContainsTest) {
ObjectID object_id = ObjectID::from_random();
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index e05c1b4..1b445f2 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -223,12 +223,13 @@ TEST(PlasmaSerialization, ReleaseReply) {
TEST(PlasmaSerialization, DeleteRequest) {
int fd = create_temp_file();
ObjectID object_id1 = ObjectID::from_random();
- ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1));
+ ARROW_CHECK_OK(SendDeleteRequest(fd, std::vector<ObjectID>{object_id1}));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaDeleteRequest);
- ObjectID object_id2;
- ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_id2));
- ASSERT_EQ(object_id1, object_id2);
+ std::vector<ObjectID> object_vec;
+ ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_vec));
+ ASSERT_EQ(object_vec.size(), 1);
+ ASSERT_EQ(object_id1, object_vec[0]);
close(fd);
}
@@ -236,12 +237,17 @@ TEST(PlasmaSerialization, DeleteReply) {
int fd = create_temp_file();
ObjectID object_id1 = ObjectID::from_random();
PlasmaError error1 = PlasmaError::ObjectExists;
- ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1));
+ ARROW_CHECK_OK(SendDeleteReply(fd, std::vector<ObjectID>{object_id1},
+ std::vector<PlasmaError>{error1}));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaDeleteReply);
- ObjectID object_id2;
- Status s = ReadDeleteReply(data.data(), data.size(), &object_id2);
- ASSERT_EQ(object_id1, object_id2);
- ASSERT_TRUE(s.IsPlasmaObjectExists());
+ std::vector<ObjectID> object_vec;
+ std::vector<PlasmaError> error_vec;
+ Status s = ReadDeleteReply(data.data(), data.size(), &object_vec, &error_vec);
+ ASSERT_EQ(object_vec.size(), 1);
+ ASSERT_EQ(object_id1, object_vec[0]);
+ ASSERT_EQ(error_vec.size(), 1);
+ ASSERT_TRUE(error_vec[0] == PlasmaError::ObjectExists);
+ ASSERT_TRUE(s.ok());
close(fd);
}