You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/10 14:36:19 UTC

[arrow] branch master updated: ARROW-1788 Fix Plasma store abort bug on client disconnection

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d34f34  ARROW-1788 Fix Plasma store abort bug on client disconnection
2d34f34 is described below

commit 2d34f34dc81966f3e186055dc0b962699c98b236
Author: Stephanie <sw...@cs.berkeley.edu>
AuthorDate: Fri Nov 10 09:36:12 2017 -0500

    ARROW-1788 Fix Plasma store abort bug on client disconnection
    
    Author: Stephanie <sw...@cs.berkeley.edu>
    
    Closes #1299 from stephanie-wang/plasma-client-disconnect-bug and squashes the following commits:
    
    295144bd [Stephanie] Revert disconnect client check
    8e24affd [Stephanie] Refactor abort_object to match remove_client_from_object_clients
    b41591d5 [Stephanie] When disconnecting a plasma client, only abort the objects that the client created
    50932e53 [Stephanie] Add Plasma test for multiple clients
---
 cpp/src/plasma/store.cc             | 27 +++++++++++++++++---------
 cpp/src/plasma/store.h              |  9 ++++++++-
 cpp/src/plasma/test/client_tests.cc | 38 +++++++++++++++++++++++++++++++++++++
 3 files changed, 64 insertions(+), 10 deletions(-)

diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 5dbdebc..31033cc 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -393,16 +393,22 @@ void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[])
   update_object_get_requests(object_id);
 }
 
-void PlasmaStore::abort_object(const ObjectID& object_id) {
+int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
   auto entry = get_object_table_entry(&store_info_, object_id);
   ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
   ARROW_CHECK(entry->state != PLASMA_SEALED)
       << "To abort an object it must not have been sealed.";
-  ARROW_CHECK(entry->clients.size() == 1)
-      << "To abort an object, the only client currently using it is the creator.";
-
-  dlfree(entry->pointer);
-  store_info_.objects.erase(object_id);
+  auto it = entry->clients.find(client);
+  if (it == entry->clients.end()) {
+    // If the client requesting the abort is not the creator, do not
+    // perform the abort.
+    return 0;
+  } else {
+    // The client requesting the abort is the creator. Free the object.
+    dlfree(entry->pointer);
+    store_info_.objects.erase(object_id);
+    return 1;
+  }
 }
 
 void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
@@ -454,11 +460,12 @@ void PlasmaStore::disconnect_client(int client_fd) {
   ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
   // If this client was using any objects, remove it from the appropriate
   // lists.
+  auto client = it->second.get();
   for (const auto& entry : store_info_.objects) {
     if (entry.second->state == PLASMA_SEALED) {
-      remove_client_from_object_clients(entry.second.get(), it->second.get());
+      remove_client_from_object_clients(entry.second.get(), client);
     } else {
-      abort_object(entry.first);
+      abort_object(entry.first, client);
     }
   }
 
@@ -600,7 +607,9 @@ Status PlasmaStore::process_message(Client* client) {
     } break;
     case MessageType_PlasmaAbortRequest: {
       RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
-      abort_object(object_id);
+      ARROW_CHECK(abort_object(object_id, client) == 1) << "To abort an object, the only "
+                                                           "client currently using it "
+                                                           "must be the creator.";
       HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
     } break;
     case MessageType_PlasmaGetRequest: {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 0d08d8a..a72c625 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -74,7 +74,14 @@ class PlasmaStore {
   int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size,
                     Client* client, PlasmaObject* result);
 
-  void abort_object(const ObjectID& object_id);
+  /// Abort a created but unsealed object. If the client is not the
+  /// creator, then the abort will fail.
+  ///
+  /// @param object_id Object ID of the object to be aborted.
+  /// @param client The client who created the object. If this does not
+  ///   match the creator of the object, then the abort will fail.
+  /// @return 1 if the abort succeeds, else 0.
+  int abort_object(const ObjectID& object_id, Client* client);
 
   /// Delete objects that have been created in the hash table. This should only
   /// be called on objects that are returned by the eviction policy to evict.
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 5c0cee4..d4285f8 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -45,14 +45,17 @@ class TestPlasmaStore : public ::testing::Test {
         "/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &";
     system(plasma_command.c_str());
     ARROW_CHECK_OK(client_.Connect("/tmp/store", "", PLASMA_DEFAULT_RELEASE_DELAY));
+    ARROW_CHECK_OK(client2_.Connect("/tmp/store", "", PLASMA_DEFAULT_RELEASE_DELAY));
   }
   virtual void Finish() {
     ARROW_CHECK_OK(client_.Disconnect());
+    ARROW_CHECK_OK(client2_.Disconnect());
     system("killall plasma_store &");
   }
 
  protected:
   PlasmaClient client_;
+  PlasmaClient client2_;
 };
 
 TEST_F(TestPlasmaStore, ContainsTest) {
@@ -171,6 +174,41 @@ TEST_F(TestPlasmaStore, AbortTest) {
   }
 }
 
+TEST_F(TestPlasmaStore, MultipleClientTest) {
+  ObjectID object_id = ObjectID::from_random();
+
+  // Test for object non-existence on the first client.
+  bool has_object;
+  ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+  ASSERT_EQ(has_object, false);
+
+  // Test for the object being in local Plasma store.
+  // First create and seal object on the second client.
+  int64_t data_size = 100;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  uint8_t* data;
+  ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
+  ARROW_CHECK_OK(client2_.Seal(object_id));
+  // Test that the first client can get the object.
+  ObjectBuffer object_buffer;
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+  ASSERT_EQ(has_object, true);
+
+  // Test that one client disconnecting does not interfere with the other.
+  // First create object on the second client.
+  object_id = ObjectID::from_random();
+  ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
+  // Disconnect the first client.
+  ARROW_CHECK_OK(client_.Disconnect());
+  // Test that the second client can seal and get the created object.
+  ARROW_CHECK_OK(client2_.Seal(object_id));
+  ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
+  ASSERT_EQ(has_object, true);
+}
+
 }  // namespace plasma
 
 int main(int argc, char** argv) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].