You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/09/30 21:52:45 UTC

[arrow] branch master updated: ARROW-3373: [Plasma] Fix bug when plasma client requests multiple objects and add test.

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3545186  ARROW-3373: [Plasma] Fix bug when plasma client requests multiple objects and add test.
3545186 is described below

commit 3545186d6997b943ffc3d79634f2d08eefbd7322
Author: Robert Nishihara <ro...@gmail.com>
AuthorDate: Sun Sep 30 17:52:31 2018 -0400

    ARROW-3373: [Plasma] Fix bug when plasma client requests multiple objects and add test.
    
    This adds a test (which fails before this fix).
    
    Author: Robert Nishihara <ro...@gmail.com>
    
    Closes #2664 from robertnishihara/fixclientbug and squashes the following commits:
    
    0fe303bce <Robert Nishihara> Fix bug when plasma client requests multiple objects and add test.
---
 cpp/src/plasma/store.cc             | 11 ++++++++---
 python/pyarrow/tests/test_plasma.py | 36 +++++++++++++++++++++++++++++++++++-
 2 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c1cab81..a01c9a2 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -386,9 +386,14 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
     }
   }
 
-  // Since no one should be waiting for this object anymore, the object ID
-  // should have been removed from the map.
-  ARROW_CHECK(object_get_requests_.count(object_id) == 0);
+  // No get requests should be waiting for this object anymore. The object ID
+  // may have been removed from the object_get_requests_ by ReturnFromGet, but
+  // if the get request has not returned yet, then remove the object ID from the
+  // map here.
+  it = object_get_requests_.find(object_id);
+  if (it != object_get_requests_.end()) {
+    object_get_requests_.erase(object_id);
+  }
 }
 
 void PlasmaStore::ProcessGetRequest(Client* client,
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 846d691..834b980 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -774,9 +774,43 @@ class TestPlasmaClient(object):
         self.plasma_client.put(1, object_id=object_id)
 
         # Check that the store is still alive. This will raise an exception if
-        # the client is dead.
+        # the store is dead.
         self.plasma_client.contains(random_object_id())
 
+    def test_client_getting_multiple_objects(self):
+        import pyarrow.plasma as plasma
+
+        object_ids = [random_object_id() for _ in range(10)]
+
+        def client_get_multiple(plasma_store_name):
+            client = plasma.connect(self.plasma_store_name, "", 0)
+            # Try to get an object ID that doesn't exist. This should block.
+            client.get(object_ids)
+
+        p = multiprocessing.Process(target=client_get_multiple,
+                                    args=(self.plasma_store_name, ))
+        p.start()
+        # Make sure the process is running.
+        time.sleep(0.2)
+        assert p.is_alive()
+
+        # Create the objects one by one.
+        for object_id in object_ids:
+            self.plasma_client.put(1, object_id=object_id)
+
+        # Check that the store is still alive. This will raise an exception if
+        # the store is dead.
+        self.plasma_client.contains(random_object_id())
+
+        # Make sure that the blocked client finishes.
+        start_time = time.time()
+        while True:
+            if time.time() - start_time > 5:
+                raise Exception("Timing out while waiting for blocked client "
+                                "to finish.")
+            if not p.is_alive():
+                break
+
 
 @pytest.mark.plasma
 def test_object_id_size():