You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2017/11/08 22:13:32 UTC
[arrow] branch master updated: ARROW-1775: Ability to abort created
but unsealed Plasma objects
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 78872a1 ARROW-1775: Ability to abort created but unsealed Plasma objects
78872a1 is described below
commit 78872a1be263e61d7901eb36663a184c2b04effb
Author: Stephanie <sw...@cs.berkeley.edu>
AuthorDate: Wed Nov 8 14:13:28 2017 -0800
ARROW-1775: Ability to abort created but unsealed Plasma objects
Author: Stephanie <sw...@cs.berkeley.edu>
Author: Philipp Moritz <pc...@gmail.com>
Closes #1289 from stephanie-wang/abort-objects and squashes the following commits:
38c42b9 [Stephanie] TODO for PascalCase
08d4040 [Stephanie] Move documentation
dd5b29e [Stephanie] Fix memory error
e6934ac [Philipp Moritz] fix linting
2b8e385 [Stephanie] Return status code when unmapping object
fe20b3b [Stephanie] Add test case for PlasmaClient::Abort
646190c [Stephanie] Abort objects that were not sealed when client disconnects
5fc44c5 [Stephanie] Implement PlasmaClient::Abort
---
cpp/src/plasma/client.cc | 97 +++++++++++++++++++++++++++++--------
cpp/src/plasma/client.h | 19 ++++++++
cpp/src/plasma/format/plasma.fbs | 12 +++++
cpp/src/plasma/protocol.cc | 28 +++++++++++
cpp/src/plasma/protocol.h | 8 +++
cpp/src/plasma/store.cc | 23 ++++++++-
cpp/src/plasma/store.h | 3 ++
cpp/src/plasma/test/client_tests.cc | 44 +++++++++++++++++
8 files changed, 213 insertions(+), 21 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e57a2a6..dd32bdc 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -278,6 +278,39 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
return Status::OK();
}
+Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
+ auto object_entry = objects_in_use_.find(object_id);
+ ARROW_CHECK(object_entry != objects_in_use_.end());
+ ARROW_CHECK(object_entry->second->count == 0);
+
+ // Decrement the count of the number of objects in this memory-mapped file
+ // that the client is using. The corresponding increment should have
+ // happened in plasma_get.
+ int fd = object_entry->second->object.handle.store_fd;
+ auto entry = mmap_table_.find(fd);
+ ARROW_CHECK(entry != mmap_table_.end());
+ ARROW_CHECK(entry->second.count >= 1);
+ if (entry->second.count == 1) {
+ // If no other objects are being used, then unmap the file.
+ int err = munmap(entry->second.pointer, entry->second.length);
+ if (err == -1) {
+ return Status::IOError("Error during munmap");
+ }
+ // Remove the corresponding entry from the hash table.
+ mmap_table_.erase(fd);
+ } else {
+ // If there are other objects being used, decrement the reference count.
+ entry->second.count -= 1;
+ }
+ // Update the in_use_object_bytes_.
+ in_use_object_bytes_ -= (object_entry->second->object.data_size +
+ object_entry->second->object.metadata_size);
+ DCHECK_GE(in_use_object_bytes_, 0);
+ // Remove the entry from the hash table of objects currently in use.
+ objects_in_use_.erase(object_id);
+ return Status::OK();
+}
+
/// This is a helper method for implementing plasma_release. We maintain a
/// buffer
/// of release calls and only perform them once the buffer becomes full (as
@@ -297,28 +330,9 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
ARROW_CHECK(object_entry->second->count >= 0);
// Check if the client is no longer using this object.
if (object_entry->second->count == 0) {
- // Decrement the count of the number of objects in this memory-mapped file
- // that the client is using. The corresponding increment should have
- // happened in plasma_get.
- int fd = object_entry->second->object.handle.store_fd;
- auto entry = mmap_table_.find(fd);
- ARROW_CHECK(entry != mmap_table_.end());
- entry->second.count -= 1;
- ARROW_CHECK(entry->second.count >= 0);
- // If none are being used then unmap the file.
- if (entry->second.count == 0) {
- munmap(entry->second.pointer, entry->second.length);
- // Remove the corresponding entry from the hash table.
- mmap_table_.erase(fd);
- }
// Tell the store that the client no longer needs the object.
+ RETURN_NOT_OK(UnmapObject(object_id));
RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
- // Update the in_use_object_bytes_.
- in_use_object_bytes_ -= (object_entry->second->object.data_size +
- object_entry->second->object.metadata_size);
- DCHECK_GE(in_use_object_bytes_, 0);
- // Remove the entry from the hash table of objects currently in use.
- objects_in_use_.erase(object_id);
}
return Status::OK();
}
@@ -344,6 +358,20 @@ Status PlasmaClient::Release(const ObjectID& object_id) {
return Status::OK();
}
+Status PlasmaClient::FlushReleaseHistory() {
+ // If the client is already disconnected, ignore the flush.
+ if (store_conn_ < 0) {
+ return Status::OK();
+ }
+ while (release_history_.size() > 0) {
+ // Perform a release for the object ID for the first pending release.
+ RETURN_NOT_OK(PerformRelease(release_history_.back()));
+ // Remove the last entry from the release history.
+ release_history_.pop_back();
+ }
+ return Status::OK();
+}
+
// This method is used to query whether the plasma store contains an object.
Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
// Check if we already have a reference to the object.
@@ -443,6 +471,35 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
return Release(object_id);
}
+Status PlasmaClient::Abort(const ObjectID& object_id) {
+ auto object_entry = objects_in_use_.find(object_id);
+ ARROW_CHECK(object_entry != objects_in_use_.end())
+ << "Plasma client called abort on an object without a reference to it";
+ ARROW_CHECK(!object_entry->second->is_sealed)
+ << "Plasma client called abort on a sealed object";
+
+ // Flush the release history.
+ RETURN_NOT_OK(FlushReleaseHistory());
+ // Make sure that the Plasma client only has one reference to the object. If
+ // it has more, then the client needs to release the buffer before calling
+ // abort.
+ if (object_entry->second->count > 1) {
+ return Status::Invalid("Plasma client cannot have a reference to the buffer.");
+ }
+
+ // Send the abort request.
+ RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
+ // Decrease the reference count to zero, then remove the object.
+ object_entry->second->count--;
+ RETURN_NOT_OK(UnmapObject(object_id));
+
+ std::vector<uint8_t> buffer;
+ ObjectID id;
+ int64_t type;
+ RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
+ return ReadAbortReply(buffer.data(), buffer.size(), &id);
+}
+
Status PlasmaClient::Delete(const ObjectID& object_id) {
// TODO(rkn): In the future, we can use this method to give hints to the
// eviction policy about when an object will no longer be needed.
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 1459424..89df2b0 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -152,6 +152,15 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Contains(const ObjectID& object_id, bool* has_object);
+ /// Abort an unsealed object in the object store. If the abort succeeds, then
+ /// it will be as if the object was never created at all. The unsealed object
+ /// must have only a single reference (the one that would have been removed by
+ /// calling Seal).
+ ///
+ /// \param object_id The ID of the object to abort.
+ /// \return The return status.
+ Status Abort(const ObjectID& object_id);
+
/// Seal an object in the object store. The object will be immutable after
/// this
/// call.
@@ -307,6 +316,16 @@ class ARROW_EXPORT PlasmaClient {
int get_manager_fd();
private:
+ /// This is a helper method for unmapping objects for which all references have
+ /// gone out of scope, either by calling Release or Abort.
+ ///
+ /// @param object_id The object ID whose data we should unmap.
+ Status UnmapObject(const ObjectID& object_id);
+
+ /// This is a helper method that flushes all pending release calls to the
+ /// store.
+ Status FlushReleaseHistory();
+
Status PerformRelease(const ObjectID& object_id);
uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 23782ad..b6d03b8 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -21,6 +21,8 @@ enum MessageType:int {
// Create a new object.
PlasmaCreateRequest = 1,
PlasmaCreateReply,
+ PlasmaAbortRequest,
+ PlasmaAbortReply,
// Seal an object.
PlasmaSealRequest,
PlasmaSealReply,
@@ -113,6 +115,16 @@ table PlasmaCreateReply {
error: PlasmaError;
}
+table PlasmaAbortRequest {
+ // ID of the object to be aborted.
+ object_id: string;
+}
+
+table PlasmaAbortReply {
+ // ID of the object that was aborted.
+ object_id: string;
+}
+
table PlasmaSealRequest {
// ID of the object to be sealed.
object_id: string;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 2261b6a..c0ebb88 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -100,6 +100,34 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
return plasma_error_status(message->error());
}
+Status SendAbortRequest(int sock, ObjectID object_id) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary()));
+ return PlasmaSend(sock, MessageType_PlasmaAbortRequest, &fbb, message);
+}
+
+Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<PlasmaAbortRequest>(data);
+ DCHECK(verify_flatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return Status::OK();
+}
+
+Status SendAbortReply(int sock, ObjectID object_id) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary()));
+ return PlasmaSend(sock, MessageType_PlasmaAbortReply, &fbb, message);
+}
+
+Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<PlasmaAbortReply>(data);
+ DCHECK(verify_flatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return Status::OK();
+}
+
// Seal messages.
Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index af4b139..e8c334f 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -51,6 +51,14 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int e
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object);
+Status SendAbortRequest(int sock, ObjectID object_id);
+
+Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
+
+Status SendAbortReply(int sock, ObjectID object_id);
+
+Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id);
+
/* Plasma Seal message functions. */
Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 210cce1..5dbdebc 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -393,6 +393,18 @@ void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[])
update_object_get_requests(object_id);
}
+void PlasmaStore::abort_object(const ObjectID& object_id) {
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
+ ARROW_CHECK(entry->state != PLASMA_SEALED)
+ << "To abort an object it must not have been sealed.";
+ ARROW_CHECK(entry->clients.size() == 1)
+ << "To abort an object, the only client currently using it is the creator.";
+
+ dlfree(entry->pointer);
+ store_info_.objects.erase(object_id);
+}
+
void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
@@ -443,7 +455,11 @@ void PlasmaStore::disconnect_client(int client_fd) {
// If this client was using any objects, remove it from the appropriate
// lists.
for (const auto& entry : store_info_.objects) {
- remove_client_from_object_clients(entry.second.get(), it->second.get());
+ if (entry.second->state == PLASMA_SEALED) {
+ remove_client_from_object_clients(entry.second.get(), it->second.get());
+ } else {
+ abort_object(entry.first);
+ }
}
// Note, the store may still attempt to send a message to the disconnected
@@ -582,6 +598,11 @@ Status PlasmaStore::process_message(Client* client) {
warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
}
} break;
+ case MessageType_PlasmaAbortRequest: {
+ RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
+ abort_object(object_id);
+ HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
+ } break;
case MessageType_PlasmaGetRequest: {
std::vector<ObjectID> object_ids_to_get;
int64_t timeout_ms;
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index d03d11f..0d08d8a 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -48,6 +48,7 @@ struct Client {
class PlasmaStore {
public:
+ // TODO: PascalCase PlasmaStore methods.
PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
bool hugetlbfs_enabled);
@@ -73,6 +74,8 @@ class PlasmaStore {
int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size,
Client* client, PlasmaObject* result);
+ void abort_object(const ObjectID& object_id);
+
/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
///
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 0f19da5..5c0cee4 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -127,6 +127,50 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
ASSERT_EQ(object_buffer[1].data[0], 2);
}
+TEST_F(TestPlasmaStore, AbortTest) {
+ ObjectID object_id = ObjectID::from_random();
+ ObjectBuffer object_buffer;
+
+ // Test for object non-existence.
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+ ASSERT_EQ(object_buffer.data_size, -1);
+
+ // Test object abort.
+ // First create object.
+ int64_t data_size = 4;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ uint8_t* data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+ // Write some data.
+ for (int64_t i = 0; i < data_size / 2; i++) {
+ data[i] = static_cast<uint8_t>(i % 4);
+ }
+ // Attempt to abort. Test that this fails before the first release.
+ Status status = client_.Abort(object_id);
+ ASSERT_TRUE(status.IsInvalid());
+ // Release, then abort.
+ ARROW_CHECK_OK(client_.Release(object_id));
+ ARROW_CHECK_OK(client_.Abort(object_id));
+
+ // Test for object non-existence after the abort.
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+ ASSERT_EQ(object_buffer.data_size, -1);
+
+ // Create the object successfully this time.
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+ for (int64_t i = 0; i < data_size; i++) {
+ data[i] = static_cast<uint8_t>(i % 4);
+ }
+ ARROW_CHECK_OK(client_.Seal(object_id));
+
+ // Test that we can get the object.
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ for (int64_t i = 0; i < data_size; i++) {
+ ASSERT_EQ(data[i], object_buffer.data[i]);
+ }
+}
+
} // namespace plasma
int main(int argc, char** argv) {
--
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].