You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2018/12/13 23:10:17 UTC

[arrow] branch master updated: ARROW-3958: [Plasma] Reduce number of IPCs

This is an automated email from the ASF dual-hosted git repository.

robertnishihara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b3bc338  ARROW-3958: [Plasma] Reduce number of IPCs
b3bc338 is described below

commit b3bc3384f3068edebe69f1084518ccfb85a368f8
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Dec 13 15:09:27 2018 -0800

    ARROW-3958: [Plasma] Reduce number of IPCs
    
    This PR also removes the client unmap, which is not necessary any more since the introduction of malloc (since there is only few memory mapped files and they typically stay around for the lifetime of the application).
    
    The PR also gets rid of a bunch of code that is not needed any more now (the release buffer, yay!).
    
    Benchmarks:
    
    ```
    import pyarrow.plasma as plasma
    
    client = plasma.connect("/tmp/plasma", "", 0)
    
    # Put performance
    
    def f():
        for i in range(10000):
            client.put(1)
    
    %timeit f()
    
    # without optimization:
    
    # 1.51 s ± 2.22 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 1.52 s ± 9.68 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 1.53 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    # with optimizations:
    
    # 1.27 s ± 10.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 1.31 s ± 8.18 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 1.31 s ± 17.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    # Create/seal performance
    
    def f():
        for i in range(10000):
            object_id = plasma.ObjectID.from_random()
            client.create(object_id, 0)
            client.seal(object_id)
    
    %timeit f()
    
    # without optimizations:
    
    # 571 ms ± 2.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 583 ms ± 22.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 588 ms ± 14.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    # with optimizations:
    
    # 531 ms ± 3.24 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 541 ms ± 9.99 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 542 ms ± 19.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    # Get performance
    
    objects = [client.put(1) for i in range(10000)]
    
    def g():
        for i in range(10000):
            client.get(objects[i])
    
    %timeit g()
    
    # without optimizations
    
    # 1.11 s ± 6.17 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 1.12 s ± 1.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 1.19 s ± 24.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    # with optimizations
    
    # 776 ms ± 11.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 792 ms ± 3.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 778 ms ± 9.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    ```
    
    Author: Philipp Moritz <pc...@gmail.com>
    Author: Robert Nishihara <ro...@gmail.com>
    
    Closes #3124 from pcmoritz/plasma-send-fd and squashes the following commits:
    
    f899f459 <Philipp Moritz> Update client.cc
    a0384040 <Robert Nishihara> Update _plasma.pyx
    af150c14 <Philipp Moritz> comments and fixes
    71c4c5c1 <Philipp Moritz> don't close fd twice
    0d572823 <Philipp Moritz> linting
    f60dcbed <Philipp Moritz> fix tests
    502aeda4 <Philipp Moritz> linting
    2887b170 <Philipp Moritz> clean up some code
    cfff7e32 <Philipp Moritz> lint
    e5ccbbac <Philipp Moritz> fixes
    5f091993 <Philipp Moritz> introduce method
    24beb277 <Philipp Moritz> working version
---
 cpp/src/plasma/client.cc                  | 184 +++++++-----------------------
 cpp/src/plasma/client.h                   |  16 +--
 cpp/src/plasma/store.cc                   |  13 ++-
 cpp/src/plasma/store.h                    |   3 +
 cpp/src/plasma/test/client_tests.cc       |  26 ++---
 docs/source/python/plasma.rst             |  10 +-
 python/pyarrow/_plasma.pyx                |  11 +-
 python/pyarrow/tensorflow/plasma_op.cc    |   4 +-
 python/pyarrow/tests/test_plasma.py       |   8 +-
 python/pyarrow/tests/test_plasma_tf_op.py |   2 +-
 10 files changed, 83 insertions(+), 194 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 99cf00c..2dbe2b4 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -83,9 +83,6 @@ typedef struct XXH64_state_s XXH64_state_t;
 constexpr int64_t kHashingConcurrency = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
 
-// Use 100MB as an overestimate of the L3 cache size.
-constexpr int64_t kL3CacheSizeBytes = 100000000;
-
 // ----------------------------------------------------------------------
 // GPU support
 
@@ -143,22 +140,13 @@ struct ObjectInUseEntry {
   bool is_sealed;
 };
 
-/// Configuration options for the plasma client.
-struct PlasmaClientConfig {
-  /// Number of release calls we wait until the object is actually released.
-  /// This allows us to avoid invalidating the cpu cache on workers if objects
-  /// are reused accross tasks.
-  size_t release_delay;
-};
-
 struct ClientMmapTableEntry {
+  /// The associated file descriptor on the client.
+  int fd;
   /// The result of mmap for this file descriptor.
   uint8_t* pointer;
   /// The length of the memory-mapped file.
   size_t length;
-  /// The number of objects in this memory-mapped file that are currently being
-  /// used by the client. When this count reaches zeros, we unmap the file.
-  int count;
 };
 
 class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
@@ -169,7 +157,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
   // PlasmaClient method implementations
 
   Status Connect(const std::string& store_socket_name,
-                 const std::string& manager_socket_name, int release_delay,
+                 const std::string& manager_socket_name, int release_delay = 0,
                  int num_retries = -1);
 
   Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
@@ -221,18 +209,22 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   int get_manager_fd() const;
 
-  Status FlushReleaseHistory();
-
   bool IsInUse(const ObjectID& object_id);
 
  private:
-  /// This is a helper method for unmapping objects for which all references have
-  /// gone out of scope, either by calling Release or Abort.
+  /// Check if store_fd has already been received from the store. If yes,
+  /// return it. Otherwise, receive it from the store (see analogous logic
+  /// in store.cc).
   ///
-  /// @param object_id The object ID whose data we should unmap.
-  Status UnmapObject(const ObjectID& object_id);
+  /// \param store_fd File descriptor to fetch from the store.
+  /// \return Client file descriptor corresponding to store_fd.
+  int GetStoreFd(int store_fd);
 
-  Status PerformRelease(const ObjectID& object_id);
+  /// This is a helper method for marking an object as unused by this client.
+  ///
+  /// \param object_id The object ID we mark unused.
+  /// \return The return status.
+  Status MarkObjectUnused(const ObjectID& object_id);
 
   /// Common helper for Get() variants
   Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
@@ -267,18 +259,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
   /// A hash table of the object IDs that are currently being used by this
   /// client.
   std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
-  /// Object IDs of the last few release calls. This is a deque and
-  /// is used to delay releasing objects to see if they can be reused by
-  /// subsequent tasks so we do not unneccessarily invalidate cpu caches.
-  /// TODO(pcm): replace this with a proper lru cache using the size of the L3
-  /// cache.
-  std::deque<ObjectID> release_history_;
-  /// The number of bytes in the combined objects that are held in the release
-  /// history doubly-linked list. If this is too large then the client starts
-  /// releasing objects.
-  int64_t in_use_object_bytes_;
-  /// Configuration options for the plasma client.
-  PlasmaClientConfig config_;
   /// The amount of memory available to the Plasma store. The client needs this
   /// information to make sure that it does not delay in releasing so much
   /// memory that the store is unable to evict enough objects to free up space.
@@ -308,7 +288,6 @@ PlasmaClient::Impl::~Impl() {}
 uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
   auto entry = mmap_table_.find(store_fd_val);
   if (entry != mmap_table_.end()) {
-    close(fd);
     return entry->second.pointer;
   } else {
     // We subtract kMmapRegionsGap from the length that was added
@@ -322,9 +301,9 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
     close(fd);  // Closing this fd has an effect on performance.
 
     ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
+    entry.fd = fd;
     entry.pointer = result;
     entry.length = map_size;
-    entry.count = 0;
     return result;
   }
 }
@@ -342,6 +321,17 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
   return (elem != objects_in_use_.end());
 }
 
+int PlasmaClient::Impl::GetStoreFd(int store_fd) {
+  auto entry = mmap_table_.find(store_fd);
+  if (entry == mmap_table_.end()) {
+    int fd = recv_fd(store_conn_);
+    ARROW_CHECK(fd >= 0) << "recv not successful";
+    return fd;
+  } else {
+    return entry->second.fd;
+  }
+}
+
 void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
                                               PlasmaObject* object, bool is_sealed) {
   // Increment the count of the object to track the fact that it is being used.
@@ -357,18 +347,6 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
     objects_in_use_[object_id]->count = 0;
     objects_in_use_[object_id]->is_sealed = is_sealed;
     object_entry = objects_in_use_[object_id].get();
-    if (object->device_num == 0) {
-      // Increment the count of the number of objects in the memory-mapped file
-      // that are being used. The corresponding decrement should happen in
-      // PlasmaClient::Release.
-      auto entry = mmap_table_.find(object->store_fd);
-      ARROW_CHECK(entry != mmap_table_.end());
-      ARROW_CHECK(entry->second.count >= 0);
-      // Update the in_use_object_bytes_.
-      in_use_object_bytes_ +=
-          (object_entry->object.data_size + object_entry->object.metadata_size);
-      entry->second.count += 1;
-    }
   } else {
     object_entry = elem->second.get();
     ARROW_CHECK(object_entry->count > 0);
@@ -397,8 +375,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
   // If the CreateReply included an error, then the store will not send a file
   // descriptor.
   if (device_num == 0) {
-    int fd = recv_fd(store_conn_);
-    ARROW_CHECK(fd >= 0) << "recv not successful";
+    int fd = GetStoreFd(store_fd);
     ARROW_CHECK(object.data_size == data_size);
     ARROW_CHECK(object.metadata_size == metadata_size);
     // The metadata should come right after the data.
@@ -535,8 +512,7 @@ Status PlasmaClient::Impl::GetBuffers(
   // in the subsequent loop based on just the store file descriptor and without
   // having to know the relevant file descriptor received from recv_fd.
   for (size_t i = 0; i < store_fds.size(); i++) {
-    int fd = recv_fd(store_conn_);
-    ARROW_CHECK(fd >= 0);
+    int fd = GetStoreFd(store_fds[i]);
     LookupOrMmap(fd, store_fds[i], mmap_sizes[i]);
   }
 
@@ -615,54 +591,21 @@ Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
   return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
 }
 
-Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) {
+Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
   auto object_entry = objects_in_use_.find(object_id);
   ARROW_CHECK(object_entry != objects_in_use_.end());
   ARROW_CHECK(object_entry->second->count == 0);
 
-  // Decrement the count of the number of objects in this memory-mapped file
-  // that the client is using. The corresponding increment should have
-  // happened in plasma_get.
-  int fd = object_entry->second->object.store_fd;
-  auto entry = mmap_table_.find(fd);
-  ARROW_CHECK(entry != mmap_table_.end());
-  ARROW_CHECK(entry->second.count >= 1);
-  if (entry->second.count == 1) {
-    // If no other objects are being used, then unmap the file.
-    // We subtract kMmapRegionsGap from the length that was added
-    // in fake_mmap in malloc.h, to make the size page-aligned again.
-    int err = munmap(entry->second.pointer, entry->second.length - kMmapRegionsGap);
-    if (err == -1) {
-      return Status::IOError("Error during munmap");
-    }
-    // Remove the corresponding entry from the hash table.
-    mmap_table_.erase(fd);
-  } else {
-    // If there are other objects being used, decrement the reference count.
-    entry->second.count -= 1;
-  }
-  // Update the in_use_object_bytes_.
-  in_use_object_bytes_ -= (object_entry->second->object.data_size +
-                           object_entry->second->object.metadata_size);
-  DCHECK_GE(in_use_object_bytes_, 0);
   // Remove the entry from the hash table of objects currently in use.
   objects_in_use_.erase(object_id);
   return Status::OK();
 }
 
-/// This is a helper method for implementing plasma_release. We maintain a
-/// buffer
-/// of release calls and only perform them once the buffer becomes full (as
-/// judged by the aggregate sizes of the objects). There may be multiple release
-/// calls for the same object ID in the buffer. In this case, the first release
-/// calls will not do anything. The client will only send a message to the store
-/// releasing the object when the client is truly done with the object.
-///
-/// @param object_id The object ID to attempt to release.
-Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
-  // Decrement the count of the number of instances of this object that are
-  // being used by this client. The corresponding increment should have happened
-  // in PlasmaClient::Get.
+Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
+  // If the client is already disconnected, ignore release requests.
+  if (store_conn_ < 0) {
+    return Status::OK();
+  }
   auto object_entry = objects_in_use_.find(object_id);
   ARROW_CHECK(object_entry != objects_in_use_.end());
   object_entry->second->count -= 1;
@@ -670,7 +613,7 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
   // Check if the client is no longer using this object.
   if (object_entry->second->count == 0) {
     // Tell the store that the client no longer needs the object.
-    RETURN_NOT_OK(UnmapObject(object_id));
+    RETURN_NOT_OK(MarkObjectUnused(object_id));
     RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
     auto iter = deletion_cache_.find(object_id);
     if (iter != deletion_cache_.end()) {
@@ -681,50 +624,6 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
   return Status::OK();
 }
 
-Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
-  // If the client is already disconnected, ignore release requests.
-  if (store_conn_ < 0) {
-    return Status::OK();
-  }
-  // If an object is in the deletion cache, handle it directly without waiting.
-  auto iter = deletion_cache_.find(object_id);
-  if (iter != deletion_cache_.end()) {
-    RETURN_NOT_OK(PerformRelease(object_id));
-    return Status::OK();
-  }
-  // Add the new object to the release history.
-  release_history_.push_front(object_id);
-  // If there are too many bytes in use by the client or if there are too many
-  // pending release calls, and there are at least some pending release calls in
-  // the release_history list, then release some objects.
-
-  // TODO(wap): Eviction policy only works on host memory, and thus objects on
-  // the GPU cannot be released currently.
-  while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
-          release_history_.size() > config_.release_delay) &&
-         release_history_.size() > 0) {
-    // Perform a release for the object ID for the first pending release.
-    RETURN_NOT_OK(PerformRelease(release_history_.back()));
-    // Remove the last entry from the release history.
-    release_history_.pop_back();
-  }
-  return Status::OK();
-}
-
-Status PlasmaClient::Impl::FlushReleaseHistory() {
-  // If the client is already disconnected, ignore the flush.
-  if (store_conn_ < 0) {
-    return Status::OK();
-  }
-  while (release_history_.size() > 0) {
-    // Perform a release for the object ID for the first pending release.
-    RETURN_NOT_OK(PerformRelease(release_history_.back()));
-    // Remove the last entry from the release history.
-    release_history_.pop_back();
-  }
-  return Status::OK();
-}
-
 // This method is used to query whether the plasma store contains an object.
 Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
   // Check if we already have a reference to the object.
@@ -855,8 +754,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
   ARROW_CHECK(!object_entry->second->is_sealed)
       << "Plasma client called abort on a sealed object";
 
-  // Flush the release history.
-  RETURN_NOT_OK(FlushReleaseHistory());
   // Make sure that the Plasma client only has one reference to the object. If
   // it has more, then the client needs to release the buffer before calling
   // abort.
@@ -868,7 +765,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
   RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
   // Decrease the reference count to zero, then remove the object.
   object_entry->second->count--;
-  RETURN_NOT_OK(UnmapObject(object_id));
+  RETURN_NOT_OK(MarkObjectUnused(object_id));
 
   std::vector<uint8_t> buffer;
   ObjectID id;
@@ -878,7 +775,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
 }
 
 Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
-  RETURN_NOT_OK(FlushReleaseHistory());
   std::vector<ObjectID> not_in_use_ids;
   for (auto& object_id : object_ids) {
     // If the object is in used, skip it.
@@ -981,8 +877,10 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
   } else {
     manager_conn_ = -1;
   }
-  config_.release_delay = release_delay;
-  in_use_object_bytes_ = 0;
+  if (release_delay != 0) {
+    ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect "
+                       << "is deprecated";
+  }
   // Send a ConnectRequest to the store to get its memory capacity.
   RETURN_NOT_OK(SendConnectRequest(store_conn_));
   std::vector<uint8_t> buffer;
@@ -1175,8 +1073,6 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
 
 int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); }
 
-Status PlasmaClient::FlushReleaseHistory() { return impl_->FlushReleaseHistory(); }
-
 bool PlasmaClient::IsInUse(const ObjectID& object_id) {
   return impl_->IsInUse(object_id);
 }
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 9e080b7..514d2bd 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -34,11 +34,6 @@ using arrow::Status;
 
 namespace plasma {
 
-/// We keep a queue of unreleased objects cached in the client until we start
-/// sending release requests to the store. This is to avoid frequently mapping
-/// and unmapping objects and evicting data from processor caches.
-constexpr int64_t kPlasmaDefaultReleaseDelay = 64;
-
 /// Object buffer data structure.
 struct ObjectBuffer {
   /// The data buffer.
@@ -62,13 +57,12 @@ class ARROW_EXPORT PlasmaClient {
   /// \param manager_socket_name The name of the UNIX domain socket to use to
   ///        connect to the local Plasma manager. If this is "", then this
   ///        function will not connect to a manager.
-  /// \param release_delay Number of released objects that are kept around
-  ///        and not evicted to avoid too many munmaps.
+  /// \param release_delay Deprecated (not used).
   /// \param num_retries number of attempts to connect to IPC socket, default 50
   /// \return The return status.
   Status Connect(const std::string& store_socket_name,
-                 const std::string& manager_socket_name,
-                 int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
+                 const std::string& manager_socket_name, int release_delay = 0,
+                 int num_retries = -1);
 
   /// Create an object in the Plasma Store. Any metadata for this object must be
   /// be passed in when the object is created.
@@ -354,10 +348,6 @@ class ARROW_EXPORT PlasmaClient {
   FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
   FRIEND_TEST(TestPlasmaStore, AbortTest);
 
-  /// This is a helper method that flushes all pending release calls to the
-  /// store.
-  Status FlushReleaseHistory();
-
   bool IsInUse(const ObjectID& object_id);
 
   class ARROW_NO_EXPORT Impl;
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index ae658d7..f6326cc 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -327,7 +327,12 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
   if (s.ok()) {
     // Send all of the file descriptors for the present objects.
     for (int store_fd : store_fds) {
-      WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
+      // Only send the file descriptor if it hasn't been sent (see analogous
+      // logic in GetStoreFd in client.cc).
+      if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) {
+        WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
+        get_req->client->used_fds.insert(store_fd);
+      }
     }
   }
 
@@ -783,8 +788,12 @@ Status PlasmaStore::ProcessMessage(Client* client) {
       HANDLE_SIGPIPE(
           SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
           client->fd);
-      if (error_code == PlasmaError::OK && device_num == 0) {
+      // Only send the file descriptor if it hasn't been sent (see analogous
+      // logic in GetStoreFd in client.cc). Similar in ReturnFromGet.
+      if (error_code == PlasmaError::OK && device_num == 0 &&
+          client->used_fds.find(object.store_fd) == client->used_fds.end()) {
         WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
+        client->used_fds.insert(object.store_fd);
       }
     } break;
     case fb::MessageType::PlasmaCreateAndSealRequest: {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 8d3facd..0e0eb83 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -54,6 +54,9 @@ struct Client {
   /// Object ids that are used by this client.
   std::unordered_set<ObjectID> object_ids;
 
+  /// File descriptors that are used by this client.
+  std::unordered_set<int> used_fds;
+
   /// The file descriptor used to push notifications to client. This is only valid
   /// if client subscribes to plasma store. -1 indicates invalid.
   int notification_fd;
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index f820303..65a9b71 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -82,7 +82,7 @@ class TestPlasmaStore : public ::testing::Test {
 
   void CreateObject(PlasmaClient& client, const ObjectID& object_id,
                     const std::vector<uint8_t>& metadata,
-                    const std::vector<uint8_t>& data) {
+                    const std::vector<uint8_t>& data, bool release = true) {
     std::shared_ptr<Buffer> data_buffer;
     ARROW_CHECK_OK(client.Create(object_id, data.size(), &metadata[0], metadata.size(),
                                  &data_buffer));
@@ -90,7 +90,9 @@ class TestPlasmaStore : public ::testing::Test {
       data_buffer->mutable_data()[i] = data[i];
     }
     ARROW_CHECK_OK(client.Seal(object_id));
-    ARROW_CHECK_OK(client.Release(object_id));
+    if (release) {
+      ARROW_CHECK_OK(client.Release(object_id));
+    }
   }
 
   const std::string& GetStoreSocketName() const { return store_socket_name_; }
@@ -155,11 +157,12 @@ TEST_F(TestPlasmaStore, SealErrorsTest) {
 
   // Create object.
   std::vector<uint8_t> data(100, 0);
-  CreateObject(client_, object_id, {42}, data);
+  CreateObject(client_, object_id, {42}, data, false);
 
   // Trying to seal it again.
   result = client_.Seal(object_id);
   ASSERT_TRUE(result.IsPlasmaObjectAlreadySealed());
+  ARROW_CHECK_OK(client_.Release(object_id));
 }
 
 TEST_F(TestPlasmaStore, DeleteTest) {
@@ -228,13 +231,7 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) {
   // client2_ won't send the release request immediately because the trigger
   // condition is not reached. The release is only added to release cache.
   object_buffers.clear();
-  // The reference count went to zero, but the objects are still in the release
-  // cache.
-  ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
-  ASSERT_TRUE(has_object);
-  ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
-  ASSERT_TRUE(has_object);
-  // The Delete call will flush release cache and send the Delete request.
+  // Delete the objects.
   result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
   ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
   ASSERT_FALSE(has_object);
@@ -277,7 +274,6 @@ TEST_F(TestPlasmaStore, GetTest) {
   // First create object.
   std::vector<uint8_t> data = {3, 5, 6, 7, 9};
   CreateObject(client_, object_id, {42}, data);
-  ARROW_CHECK_OK(client_.FlushReleaseHistory());
   EXPECT_FALSE(client_.IsInUse(object_id));
 
   object_buffers.clear();
@@ -291,11 +287,9 @@ TEST_F(TestPlasmaStore, GetTest) {
     auto metadata = object_buffers[0].metadata;
     object_buffers.clear();
     ::arrow::AssertBufferEqual(*metadata, std::string{42});
-    ARROW_CHECK_OK(client_.FlushReleaseHistory());
     EXPECT_TRUE(client_.IsInUse(object_id));
   }
   // Object is automatically released
-  ARROW_CHECK_OK(client_.FlushReleaseHistory());
   EXPECT_FALSE(client_.IsInUse(object_id));
 }
 
@@ -314,17 +308,14 @@ TEST_F(TestPlasmaStore, LegacyGetTest) {
     // First create object.
     std::vector<uint8_t> data = {3, 5, 6, 7, 9};
     CreateObject(client_, object_id, {42}, data);
-    ARROW_CHECK_OK(client_.FlushReleaseHistory());
     EXPECT_FALSE(client_.IsInUse(object_id));
 
     ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
     AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9});
   }
   // Object needs releasing manually
-  ARROW_CHECK_OK(client_.FlushReleaseHistory());
   EXPECT_TRUE(client_.IsInUse(object_id));
   ARROW_CHECK_OK(client_.Release(object_id));
-  ARROW_CHECK_OK(client_.FlushReleaseHistory());
   EXPECT_FALSE(client_.IsInUse(object_id));
 }
 
@@ -377,11 +368,9 @@ TEST_F(TestPlasmaStore, AbortTest) {
   ASSERT_TRUE(status.IsInvalid());
   // Release, then abort.
   ARROW_CHECK_OK(client_.Release(object_id));
-  ARROW_CHECK_OK(client_.FlushReleaseHistory());
   EXPECT_TRUE(client_.IsInUse(object_id));
 
   ARROW_CHECK_OK(client_.Abort(object_id));
-  ARROW_CHECK_OK(client_.FlushReleaseHistory());
   EXPECT_FALSE(client_.IsInUse(object_id));
 
   // Test for object non-existence after the abort.
@@ -394,7 +383,6 @@ TEST_F(TestPlasmaStore, AbortTest) {
   // Test that we can get the object.
   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
   AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
-  ARROW_CHECK_OK(client_.Release(object_id));
 }
 
 TEST_F(TestPlasmaStore, MultipleClientTest) {
diff --git a/docs/source/python/plasma.rst b/docs/source/python/plasma.rst
index 09837cf..3df68ef 100644
--- a/docs/source/python/plasma.rst
+++ b/docs/source/python/plasma.rst
@@ -60,7 +60,7 @@ socket name:
 .. code-block:: python
 
   import pyarrow.plasma as plasma
-  client = plasma.connect("/tmp/plasma", "", 0)
+  client = plasma.connect("/tmp/plasma", "")
 
 If the following error occurs from running the above Python code, that
 means that either the socket given is incorrect, or the ``./plasma_store`` is
@@ -68,7 +68,7 @@ not currently running. Check to see if the Plasma store is still running.
 
 .. code-block:: shell
 
-  >>> client = plasma.connect("/tmp/plasma", "", 0)
+  >>> client = plasma.connect("/tmp/plasma", "")
   Connection to socket failed for pathname /tmp/plasma
   Could not connect to socket /tmp/plasma
 
@@ -179,7 +179,7 @@ the object buffer.
 
   # Create a different client. Note that this second client could be
   # created in the same or in a separate, concurrent Python session.
-  client2 = plasma.connect("/tmp/plasma", "", 0)
+  client2 = plasma.connect("/tmp/plasma", "")
 
   # Get the object in the second client. This blocks until the object has been sealed.
   object_id2 = plasma.ObjectID(20 * b"a")
@@ -221,7 +221,7 @@ of the object info might change in the future):
   import pyarrow.plasma as plasma
   import time
 
-  client = plasma.connect("/tmp/plasma", "", 0)
+  client = plasma.connect("/tmp/plasma", "")
 
   client.put("hello, world")
   # Sleep a little so we get different creation times
@@ -452,7 +452,7 @@ You can test this with the following script:
   import pyarrow.plasma as plasma
   import time
 
-  client = plasma.connect("/tmp/plasma", "", 0)
+  client = plasma.connect("/tmp/plasma", "")
 
   data = np.random.randn(100000000)
   tensor = pa.Tensor.from_numpy(data)
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 2fad09c..f7db3b4 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -30,10 +30,11 @@ from cython.operator cimport dereference as deref, preincrement as inc
 from cpython.pycapsule cimport *
 
 import collections
-import pyarrow
 import random
 import socket
+import warnings
 
+import pyarrow
 from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
 from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
                                         CFixedSizeBufferWriter, CStatus)
@@ -872,7 +873,7 @@ cdef class PlasmaClient:
         return result
 
 
-def connect(store_socket_name, manager_socket_name, int release_delay,
+def connect(store_socket_name, manager_socket_name, int release_delay=0,
             int num_retries=-1):
     """
     Return a new PlasmaClient that is connected a plasma store and
@@ -885,8 +886,7 @@ def connect(store_socket_name, manager_socket_name, int release_delay,
     manager_socket_name : str
         Name of the socket the plasma manager is listening at.
     release_delay : int
-        The maximum number of objects that the client will keep and
-        delay releasing (for caching reasons).
+        This parameter is deprecated and has no effect.
     num_retries : int, default -1
         Number of times to try to connect to plasma store. Default value of -1
         uses the default (50)
@@ -894,6 +894,9 @@ def connect(store_socket_name, manager_socket_name, int release_delay,
     cdef PlasmaClient result = PlasmaClient()
     result.store_socket_name = store_socket_name.encode()
     result.manager_socket_name = manager_socket_name.encode()
+    if release_delay != 0:
+        warnings.warn("release_delay in PlasmaClient.connect is deprecated",
+                      FutureWarning)
     with nogil:
         check_status(result.client.get()
                      .Connect(result.store_socket_name,
diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc
index a341d5a..4e6449a 100644
--- a/python/pyarrow/tensorflow/plasma_op.cc
+++ b/python/pyarrow/tensorflow/plasma_op.cc
@@ -77,7 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
     if (!connected_) {
       VLOG(1) << "Connecting to Plasma...";
       ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
-                                     plasma_manager_socket_name_, 0));
+                                     plasma_manager_socket_name_));
       VLOG(1) << "Connected!";
       connected_ = true;
     }
@@ -249,7 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
     if (!connected_) {
       VLOG(1) << "Connecting to Plasma...";
       ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
-                                     plasma_manager_socket_name_, 0));
+                                     plasma_manager_socket_name_));
       VLOG(1) << "Connected!";
       connected_ = true;
     }
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index e3d31b7..66449e6 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -121,8 +121,8 @@ class TestPlasmaClient(object):
             use_one_memory_mapped_file=use_one_memory_mapped_file)
         self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
         # Connect to Plasma.
-        self.plasma_client = plasma.connect(self.plasma_store_name, "", 64)
-        self.plasma_client2 = plasma.connect(self.plasma_store_name, "", 0)
+        self.plasma_client = plasma.connect(self.plasma_store_name, "")
+        self.plasma_client2 = plasma.connect(self.plasma_store_name, "")
 
     def teardown_method(self, test_method):
         try:
@@ -948,7 +948,7 @@ def test_use_huge_pages():
             plasma_store_memory=2*10**9,
             plasma_directory="/mnt/hugepages",
             use_hugepages=True) as (plasma_store_name, p):
-        plasma_client = plasma.connect(plasma_store_name, "", 64)
+        plasma_client = plasma.connect(plasma_store_name, "")
         create_object(plasma_client, 10**8)
 
 
@@ -962,7 +962,7 @@ def test_plasma_client_sharing():
     with plasma.start_plasma_store(
             plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
             as (plasma_store_name, p):
-        plasma_client = plasma.connect(plasma_store_name, "", 64)
+        plasma_client = plasma.connect(plasma_store_name, "")
         object_id = plasma_client.put(np.zeros(3))
         buf = plasma_client.get(object_id)
         del plasma_client
diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py
index d9bf915..51e8b28 100644
--- a/python/pyarrow/tests/test_plasma_tf_op.py
+++ b/python/pyarrow/tests/test_plasma_tf_op.py
@@ -94,7 +94,7 @@ def test_plasma_tf_op(use_gpu=False):
         pytest.skip("TensorFlow Op not found")
 
     with plasma.start_plasma_store(10**8) as (plasma_store_name, p):
-        client = plasma.connect(plasma_store_name, "", 0)
+        client = plasma.connect(plasma_store_name, "")
         for dtype in [np.float32, np.float64,
                       np.int8, np.int16, np.int32, np.int64]:
             run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,