You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/15 15:51:50 UTC

[arrow] branch master updated: ARROW-1812: [C++] Plasma store modifies hash table while iterating during client disconnect

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9812aea  ARROW-1812: [C++] Plasma store modifies hash table while iterating during client disconnect
9812aea is described below

commit 9812aea13e810626f4f5b74f1ca4ecbb843578e9
Author: Stephanie <sw...@cs.berkeley.edu>
AuthorDate: Wed Nov 15 10:51:45 2017 -0500

    ARROW-1812: [C++] Plasma store modifies hash table while iterating during client disconnect
    
    Author: Stephanie <sw...@cs.berkeley.edu>
    
    Closes #1322 from stephanie-wang/plasma-disconnect-bug and squashes the following commits:
    
    cf1872d8 [Stephanie] Fix disconnect client memory error
    8929b6bc [Stephanie] Introduce more complicated test for multiple Plasma clients
---
 cpp/src/plasma/store.cc             | 10 ++++++-
 cpp/src/plasma/test/client_tests.cc | 53 +++++++++++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 31033cc..7094aed 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -460,14 +460,22 @@ void PlasmaStore::disconnect_client(int client_fd) {
   ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
   // If this client was using any objects, remove it from the appropriate
   // lists.
+  // TODO(swang): Avoid iteration through the object table.
   auto client = it->second.get();
+  std::vector<ObjectID> unsealed_objects;
   for (const auto& entry : store_info_.objects) {
     if (entry.second->state == PLASMA_SEALED) {
       remove_client_from_object_clients(entry.second.get(), client);
     } else {
-      abort_object(entry.first, client);
+      // Add unsealed objects to a temporary list of object IDs. Do not perform
+      // the abort here, since it potentially modifies the object table.
+      unsealed_objects.push_back(entry.first);
     }
   }
+  // If the client was creating any objects, abort them.
+  for (const auto& entry : unsealed_objects) {
+    abort_object(entry, client);
+  }
 
   // Note, the store may still attempt to send a message to the disconnected
   // client (for example, when an object ID that the client was waiting for
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index d4285f8..f44ed25 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -209,6 +209,59 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   ASSERT_EQ(has_object, true);
 }
 
+TEST_F(TestPlasmaStore, ManyObjectTest) {
+  // Create many objects on the first client. Seal one third, abort one third,
+  // and leave the last third unsealed.
+  std::vector<ObjectID> object_ids;
+  for (int i = 0; i < 100; i++) {
+    ObjectID object_id = ObjectID::from_random();
+    object_ids.push_back(object_id);
+
+    // Test for object non-existence on the first client.
+    bool has_object;
+    ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+    ASSERT_EQ(has_object, false);
+
+    // Test for the object being in local Plasma store.
+    // First create and seal object on the first client.
+    int64_t data_size = 100;
+    uint8_t metadata[] = {5};
+    int64_t metadata_size = sizeof(metadata);
+    uint8_t* data;
+    ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+
+    if (i % 3 == 0) {
+      // Seal one third of the objects.
+      ARROW_CHECK_OK(client_.Seal(object_id));
+      // Test that the first client can get the object.
+      ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+      ASSERT_EQ(has_object, true);
+    } else if (i % 3 == 1) {
+      // Abort one third of the objects.
+      ARROW_CHECK_OK(client_.Release(object_id));
+      ARROW_CHECK_OK(client_.Abort(object_id));
+    }
+  }
+  // Disconnect the first client. All unsealed objects should be aborted.
+  ARROW_CHECK_OK(client_.Disconnect());
+
+  // Check that the second client can query the object store for the first
+  // client's objects.
+  int i = 0;
+  for (auto const& object_id : object_ids) {
+    bool has_object;
+    ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
+    if (i % 3 == 0) {
+      // The first third should be sealed.
+      ASSERT_EQ(has_object, true);
+    } else {
+      // The rest were aborted, so the object is not in the store.
+      ASSERT_EQ(has_object, false);
+    }
+    i++;
+  }
+}
+
 }  // namespace plasma
 
 int main(int argc, char** argv) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].