You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/10 14:36:19 UTC
[arrow] branch master updated: ARROW-1788 Fix Plasma store abort
bug on client disconnection
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2d34f34 ARROW-1788 Fix Plasma store abort bug on client disconnection
2d34f34 is described below
commit 2d34f34dc81966f3e186055dc0b962699c98b236
Author: Stephanie <sw...@cs.berkeley.edu>
AuthorDate: Fri Nov 10 09:36:12 2017 -0500
ARROW-1788 Fix Plasma store abort bug on client disconnection
Author: Stephanie <sw...@cs.berkeley.edu>
Closes #1299 from stephanie-wang/plasma-client-disconnect-bug and squashes the following commits:
295144bd [Stephanie] Revert disconnect client check
8e24affd [Stephanie] Refactor abort_object to match remove_client_from_object_clients
b41591d5 [Stephanie] When disconnecting a plasma client, only abort the objects that the client created
50932e53 [Stephanie] Add Plasma test for multiple clients
---
cpp/src/plasma/store.cc | 27 +++++++++++++++++---------
cpp/src/plasma/store.h | 9 ++++++++-
cpp/src/plasma/test/client_tests.cc | 38 +++++++++++++++++++++++++++++++++++++
3 files changed, 64 insertions(+), 10 deletions(-)
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 5dbdebc..31033cc 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -393,16 +393,22 @@ void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[])
update_object_get_requests(object_id);
}
-void PlasmaStore::abort_object(const ObjectID& object_id) {
+int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
auto entry = get_object_table_entry(&store_info_, object_id);
ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
ARROW_CHECK(entry->state != PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
- ARROW_CHECK(entry->clients.size() == 1)
- << "To abort an object, the only client currently using it is the creator.";
-
- dlfree(entry->pointer);
- store_info_.objects.erase(object_id);
+ auto it = entry->clients.find(client);
+ if (it == entry->clients.end()) {
+ // If the client requesting the abort is not the creator, do not
+ // perform the abort.
+ return 0;
+ } else {
+ // The client requesting the abort is the creator. Free the object.
+ dlfree(entry->pointer);
+ store_info_.objects.erase(object_id);
+ return 1;
+ }
}
void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
@@ -454,11 +460,12 @@ void PlasmaStore::disconnect_client(int client_fd) {
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// If this client was using any objects, remove it from the appropriate
// lists.
+ auto client = it->second.get();
for (const auto& entry : store_info_.objects) {
if (entry.second->state == PLASMA_SEALED) {
- remove_client_from_object_clients(entry.second.get(), it->second.get());
+ remove_client_from_object_clients(entry.second.get(), client);
} else {
- abort_object(entry.first);
+ abort_object(entry.first, client);
}
}
@@ -600,7 +607,9 @@ Status PlasmaStore::process_message(Client* client) {
} break;
case MessageType_PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
- abort_object(object_id);
+ ARROW_CHECK(abort_object(object_id, client) == 1) << "To abort an object, the only "
+ "client currently using it "
+ "must be the creator.";
HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
} break;
case MessageType_PlasmaGetRequest: {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 0d08d8a..a72c625 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -74,7 +74,14 @@ class PlasmaStore {
int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size,
Client* client, PlasmaObject* result);
- void abort_object(const ObjectID& object_id);
+ /// Abort a created but unsealed object. If the client is not the
+ /// creator, then the abort will fail.
+ ///
+ /// @param object_id Object ID of the object to be aborted.
+ /// @param client The client who created the object. If this does not
+ /// match the creator of the object, then the abort will fail.
+ /// @return 1 if the abort succeeds, else 0.
+ int abort_object(const ObjectID& object_id, Client* client);
/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 5c0cee4..d4285f8 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -45,14 +45,17 @@ class TestPlasmaStore : public ::testing::Test {
"/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &";
system(plasma_command.c_str());
ARROW_CHECK_OK(client_.Connect("/tmp/store", "", PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client2_.Connect("/tmp/store", "", PLASMA_DEFAULT_RELEASE_DELAY));
}
virtual void Finish() {
ARROW_CHECK_OK(client_.Disconnect());
+ ARROW_CHECK_OK(client2_.Disconnect());
system("killall plasma_store &");
}
protected:
PlasmaClient client_;
+ PlasmaClient client2_;
};
TEST_F(TestPlasmaStore, ContainsTest) {
@@ -171,6 +174,41 @@ TEST_F(TestPlasmaStore, AbortTest) {
}
}
+TEST_F(TestPlasmaStore, MultipleClientTest) {
+ ObjectID object_id = ObjectID::from_random();
+
+ // Test for object non-existence on the first client.
+ bool has_object;
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, false);
+
+ // Test for the object being in local Plasma store.
+ // First create and seal object on the second client.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ uint8_t* data;
+ ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
+ ARROW_CHECK_OK(client2_.Seal(object_id));
+ // Test that the first client can get the object.
+ ObjectBuffer object_buffer;
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+
+ // Test that one client disconnecting does not interfere with the other.
+ // First create object on the second client.
+ object_id = ObjectID::from_random();
+ ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
+ // Disconnect the first client.
+ ARROW_CHECK_OK(client_.Disconnect());
+ // Test that the second client can seal and get the created object.
+ ARROW_CHECK_OK(client2_.Seal(object_id));
+ ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+}
+
} // namespace plasma
int main(int argc, char** argv) {
--
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].